1use arrow::array::{Array, DictionaryArray};
2use arrow::array::{BinaryArray, BooleanArray, BooleanBufferBuilder, StringArray, cast::AsArray};
3use arrow::datatypes::UInt16Type;
4use arrow_schema::DataType;
5use datafusion_common::ScalarValue;
6use datafusion_expr_common::columnar_value::ColumnarValue;
7use datafusion_expr_common::operator::Operator;
8use datafusion_physical_expr_common::datum::apply_cmp;
9use fsst::Compressor;
10use std::sync::Arc;
11use std::vec;
12
13use super::LiquidByteViewArray;
14use super::fingerprint::{StringFingerprint, substring_pattern_bytes};
15use crate::liquid_array::byte_view_array::operator::{self, ByteViewOperator};
16use crate::liquid_array::raw::FsstArray;
17use crate::liquid_array::raw::fsst_buffer::{DiskBuffer, FsstBacking, PrefixKey};
18
19impl LiquidByteViewArray<FsstArray> {
20 pub(super) fn compare_equals(&self, needle: &[u8]) -> BooleanArray {
22 let shared_prefix_len = self.shared_prefix.len();
23 let num_unique = self.prefix_keys.len();
24 if needle.len() < shared_prefix_len || needle[..shared_prefix_len] != self.shared_prefix {
25 return self.map_dictionary_results_to_array_results(vec![false; num_unique]);
26 }
27
28 let needle_suffix = &needle[shared_prefix_len..];
29 let needle_len = needle_suffix.len();
30 let prefix_len = PrefixKey::prefix_len();
31 let mut dict_results = vec![false; num_unique];
32
33 if needle_len <= prefix_len {
34 for (i, prefix_key) in self.prefix_keys.iter().enumerate().take(num_unique) {
35 let known_len = if prefix_key.len_byte() == 255 {
36 None
37 } else {
38 Some(prefix_key.len_byte() as usize)
39 };
40 if let Some(l) = known_len
41 && l == needle_len
42 && prefix_key.prefix7()[..l] == needle_suffix[..l]
43 {
44 dict_results[i] = true;
45 }
46 }
47
48 return self.map_dictionary_results_to_array_results(dict_results);
49 }
50
51 let compressed_needle = compress_needle(self.fsst_buffer.compressor(), needle);
52
53 for (i, prefix_key) in self.prefix_keys.iter().enumerate().take(num_unique) {
54 let known_len = if prefix_key.len_byte() == 255 {
55 None
56 } else {
57 Some(prefix_key.len_byte() as usize)
58 };
59
60 match known_len {
61 Some(l) => {
62 if l != needle_len {
63 continue;
64 }
65 }
66 None => {
67 if needle_len < 255 {
68 continue;
69 }
70 }
71 }
72
73 if prefix_key.prefix7()[..prefix_len] == needle_suffix[..prefix_len] {
74 let compressed_value = self.fsst_buffer.get_compressed_slice(i);
75 if compressed_value == compressed_needle.as_slice() {
76 dict_results[i] = true;
77 }
78 }
79 }
80
81 self.map_dictionary_results_to_array_results(dict_results)
82 }
83
84 fn compare_not_equals(&self, needle: &[u8]) -> BooleanArray {
86 let result = self.compare_equals(needle);
87 let (values, nulls) = result.into_parts();
88 let values = !&values;
89 BooleanArray::new(values, nulls)
90 }
91
92 pub fn compare_with(&self, needle: &[u8], op: &ByteViewOperator) -> BooleanArray {
94 match op {
95 ByteViewOperator::Comparison(cmp) => self.compare_with_inner(needle, cmp),
96 ByteViewOperator::Equality(operator::Equality::Eq) => self.compare_equals(needle),
97 ByteViewOperator::Equality(operator::Equality::NotEq) => {
98 self.compare_not_equals(needle)
99 }
100 ByteViewOperator::SubString(op) => {
101 if let Some(fingerprints) = self.string_fingerprints.as_ref() {
102 let pattern =
103 substring_pattern_bytes(needle).expect("Invalid substring pattern");
104 self.compare_like_substring(pattern, *op, fingerprints)
105 } else {
106 let fallback = ByteViewOperator::SubString(*op);
107 self.compare_with_arrow_fallback(needle, &fallback)
108 }
109 }
110 }
111 }
112
113 pub(super) fn compare_with_inner(
115 &self,
116 needle: &[u8],
117 op: &operator::Comparison,
118 ) -> BooleanArray {
119 let (mut dict_results, ambiguous) = self.compare_with_prefix(needle, op);
120
121 if !ambiguous.is_empty() {
123 let (values_buffer, offsets_buffer) =
124 self.fsst_buffer.to_uncompressed_selected(&ambiguous);
125 let binary_array =
126 unsafe { BinaryArray::new_unchecked(offsets_buffer, values_buffer, None) };
127
128 for (pos, &dict_index) in ambiguous.iter().enumerate() {
129 let value_cmp = binary_array.value(pos).cmp(needle);
130 let result = match (op, value_cmp) {
131 (operator::Comparison::Lt, std::cmp::Ordering::Less) => true,
132 (operator::Comparison::Lt, _) => false,
133 (
134 operator::Comparison::LtEq,
135 std::cmp::Ordering::Less | std::cmp::Ordering::Equal,
136 ) => true,
137 (operator::Comparison::LtEq, _) => false,
138 (operator::Comparison::Gt, std::cmp::Ordering::Greater) => true,
139 (operator::Comparison::Gt, _) => false,
140 (
141 operator::Comparison::GtEq,
142 std::cmp::Ordering::Greater | std::cmp::Ordering::Equal,
143 ) => true,
144 (operator::Comparison::GtEq, _) => false,
145 };
146 dict_results[dict_index] = result;
147 }
148 }
149
150 self.map_dictionary_results_to_array_results(dict_results)
151 }
152
153 fn compare_with_arrow_fallback(&self, needle: &[u8], op: &ByteViewOperator) -> BooleanArray {
155 let dict_array = self.to_dict_arrow();
156 compare_with_arrow_inner(dict_array, needle, op)
157 }
158
159 pub(super) fn compare_like_substring(
160 &self,
161 needle: &[u8],
162 operator: operator::SubString,
163 fingerprints: &Arc<[u32]>,
164 ) -> BooleanArray {
165 let (dict_results, ambiguous) = compute_fingerprint_candidates(needle, fingerprints);
166
167 let dict_results = if !ambiguous.is_empty() {
168 let (values_buffer, offsets_buffer) =
169 self.fsst_buffer.to_uncompressed_selected(&ambiguous);
170 apply_like_match_on_candidates(
171 dict_results,
172 ambiguous,
173 values_buffer,
174 offsets_buffer,
175 needle,
176 operator,
177 )
178 } else {
179 dict_results
180 };
181
182 self.map_dictionary_results_to_array_results(dict_results)
183 }
184}
185
186impl LiquidByteViewArray<DiskBuffer> {
187 pub(crate) async fn compare_with(&self, needle: &[u8], op: &ByteViewOperator) -> BooleanArray {
188 match op {
189 ByteViewOperator::Equality(operator::Equality::Eq) => self.compare_equals(needle).await,
190 ByteViewOperator::Equality(operator::Equality::NotEq) => {
191 self.compare_not_equals(needle).await
192 }
193 ByteViewOperator::Comparison(op) => self.compare_with_inner(needle, op).await,
194 ByteViewOperator::SubString(op) => {
195 let pattern = substring_pattern_bytes(needle).expect("Invalid substring pattern");
196 let fingerprints = self
197 .string_fingerprints
198 .as_ref()
199 .expect("Fingerprints not initialized");
200 self.compare_like_substring(pattern, *op, fingerprints)
201 .await
202 }
203 }
204 }
205
206 async fn compare_not_equals(&self, needle: &[u8]) -> BooleanArray {
208 let result = self.compare_equals(needle).await;
209 let (values, nulls) = result.into_parts();
210 let values = !&values;
211 BooleanArray::new(values, nulls)
212 }
213
214 pub(super) async fn compare_equals(&self, needle: &[u8]) -> BooleanArray {
216 let (mut dict_results, ambiguous) = self.compare_equals_with_prefix(needle);
217 if !ambiguous.is_empty() {
218 let bytes = self
219 .fsst_buffer
220 .squeeze_io()
221 .read(Some(self.fsst_buffer.disk_range()))
222 .await
223 .expect("read squeezed backing");
224 let hydrated = LiquidByteViewArray::<FsstArray>::from_bytes(
225 bytes,
226 self.fsst_buffer.compressor_arc(),
227 );
228 let compressed_needle = compress_needle(hydrated.fsst_buffer.compressor(), needle);
229
230 for &dict_index in ambiguous.iter() {
231 let compressed_value = hydrated.fsst_buffer.get_compressed_slice(dict_index);
232 if compressed_value == compressed_needle.as_slice() {
233 dict_results[dict_index] = true;
234 }
235 }
236 } else {
237 self.fsst_buffer.squeeze_io().trace_io_saved();
238 }
239
240 self.map_dictionary_results_to_array_results(dict_results)
241 }
242
243 pub(super) async fn compare_with_inner(
245 &self,
246 needle: &[u8],
247 op: &operator::Comparison,
248 ) -> BooleanArray {
249 let (mut dict_results, ambiguous) = self.compare_with_prefix(needle, op);
250
251 if !ambiguous.is_empty() {
253 let (values_buffer, offsets_buffer) =
254 self.fsst_buffer.to_uncompressed_selected(&ambiguous).await;
255 let binary_array =
256 unsafe { BinaryArray::new_unchecked(offsets_buffer, values_buffer, None) };
257
258 for (pos, &dict_index) in ambiguous.iter().enumerate() {
259 let value_cmp = bytes_cmp_short_auto(binary_array.value(pos), needle);
260 let result = match (op, value_cmp) {
261 (operator::Comparison::Lt, std::cmp::Ordering::Less) => true,
262 (operator::Comparison::Lt, _) => false,
263 (
264 operator::Comparison::LtEq,
265 std::cmp::Ordering::Less | std::cmp::Ordering::Equal,
266 ) => true,
267 (operator::Comparison::LtEq, _) => false,
268 (operator::Comparison::Gt, std::cmp::Ordering::Greater) => true,
269 (operator::Comparison::Gt, _) => false,
270 (
271 operator::Comparison::GtEq,
272 std::cmp::Ordering::Greater | std::cmp::Ordering::Equal,
273 ) => true,
274 (operator::Comparison::GtEq, _) => false,
275 };
276 dict_results[dict_index] = result;
277 }
278 } else {
279 self.fsst_buffer.squeeze_io().trace_io_saved();
280 }
281
282 self.map_dictionary_results_to_array_results(dict_results)
283 }
284
285 pub(super) async fn compare_like_substring(
286 &self,
287 needle: &[u8],
288 operator: operator::SubString,
289 fingerprints: &Arc<[u32]>,
290 ) -> BooleanArray {
291 let (dict_results, ambiguous) = compute_fingerprint_candidates(needle, fingerprints);
292
293 let dict_results = if !ambiguous.is_empty() {
294 let (values_buffer, offsets_buffer) =
295 self.fsst_buffer.to_uncompressed_selected(&ambiguous).await;
296 apply_like_match_on_candidates(
297 dict_results,
298 ambiguous,
299 values_buffer,
300 offsets_buffer,
301 needle,
302 operator,
303 )
304 } else {
305 self.fsst_buffer.squeeze_io().trace_io_saved();
306 dict_results
307 };
308
309 self.map_dictionary_results_to_array_results(dict_results)
310 }
311}
312
313impl<B: FsstBacking> LiquidByteViewArray<B> {
314 pub fn prefix_compare_counts(
316 &self,
317 needle: &[u8],
318 op: &operator::Comparison,
319 ) -> (usize, usize, usize) {
320 let (dict_results, ambiguous) = self.compare_with_prefix(needle, op);
321 let selected_rows = dict_results.iter().filter(|&x| *x).count();
322 (selected_rows, ambiguous.len(), self.dictionary_keys.len())
323 }
324
325 fn map_dictionary_results_to_array_results(&self, dict_results: Vec<bool>) -> BooleanArray {
326 let len = self.dictionary_keys.len();
327 let mut builder = BooleanBufferBuilder::new(len);
328 builder.advance(len);
329 for index in 0..len {
330 if !self.dictionary_keys.is_valid(index) {
331 continue;
332 }
333
334 let dict_index = self.dictionary_keys.value(index) as usize;
335 debug_assert!(dict_index < dict_results.len());
336 if dict_results.get(dict_index).copied().unwrap_or(false) {
337 builder.set_bit(index, true);
338 }
339 }
340
341 let values = builder.finish();
342 if let Some(nulls) = self.nulls() {
343 BooleanArray::new(values, Some(nulls.clone()))
344 } else {
345 BooleanArray::new(values, None)
346 }
347 }
348
349 #[inline(never)]
351 pub(super) fn compare_with_prefix(
352 &self,
353 needle: &[u8],
354 op: &operator::Comparison,
355 ) -> (Vec<bool>, Vec<usize>) {
356 if let Some(result) = self.compare_with_shared_prefix(needle, op) {
358 return (vec![result; self.dictionary_keys.len()], Vec::new());
359 }
360
361 let needle_suffix = &needle[self.shared_prefix.len()..];
362 let num_unique = self.prefix_keys.len();
363 let mut dict_results = vec![false; num_unique];
364 let mut ambiguous = Vec::new();
365
366 let cmp_len = needle_suffix.len().min(PrefixKey::prefix_len());
367 if cmp_len == 0 {
368 for (i, prefix_key) in self.prefix_keys.iter().enumerate() {
369 let is_empty_suffix = prefix_key.len_byte() == 0;
370 dict_results[i] = match op {
371 operator::Comparison::Lt => false,
372 operator::Comparison::LtEq => is_empty_suffix,
373 operator::Comparison::Gt => !is_empty_suffix,
374 operator::Comparison::GtEq => true,
375 };
376 }
377 return (dict_results, ambiguous);
378 }
379
380 for (i, prefix_key) in self.prefix_keys.iter().enumerate() {
381 let ordering = bytes_cmp_short(prefix_key.prefix7(), needle_suffix, cmp_len);
382 match ordering {
383 std::cmp::Ordering::Less => match op {
384 operator::Comparison::Lt | operator::Comparison::LtEq => {
385 dict_results[i] = true;
386 }
387 operator::Comparison::Gt | operator::Comparison::GtEq => {
388 dict_results[i] = false;
389 }
390 },
391 std::cmp::Ordering::Greater => match op {
392 operator::Comparison::Lt | operator::Comparison::LtEq => {
393 dict_results[i] = false;
394 }
395 operator::Comparison::Gt | operator::Comparison::GtEq => {
396 dict_results[i] = true;
397 }
398 },
399 std::cmp::Ordering::Equal => {
400 ambiguous.push(i);
401 }
402 }
403 }
404 (dict_results, ambiguous)
405 }
406
407 fn compare_equals_with_prefix(&self, needle: &[u8]) -> (Vec<bool>, Vec<usize>) {
409 let shared_prefix_len = self.shared_prefix.len();
410 let num_unique = self.prefix_keys.len();
411 if needle.len() < shared_prefix_len || needle[..shared_prefix_len] != self.shared_prefix {
412 return (vec![false; num_unique], Vec::new());
413 }
414
415 let needle_suffix = &needle[shared_prefix_len..];
416 let needle_len = needle_suffix.len();
417 let prefix_len = PrefixKey::prefix_len();
418
419 let mut dict_results = vec![false; num_unique];
420 let mut ambiguous = Vec::new();
421
422 for (i, prefix_key) in self.prefix_keys.iter().enumerate().take(num_unique) {
423 let known_len = if prefix_key.len_byte() == 255 {
424 None
425 } else {
426 Some(prefix_key.len_byte() as usize)
427 };
428
429 match known_len {
431 Some(l) => {
432 if l != needle_len {
433 continue;
434 }
435 }
436 None => {
437 if needle_len < 255 {
438 continue;
439 }
440 }
441 }
442
443 match known_len {
445 None => {
446 if prefix_key.prefix7()[..prefix_len] == needle_suffix[..prefix_len] {
448 ambiguous.push(i);
449 }
450 }
451 Some(l) if l <= prefix_len => {
452 if prefix_key.prefix7()[..l] == needle_suffix[..l] {
454 dict_results[i] = true;
455 }
456 }
457 Some(_l) => {
458 if prefix_key.prefix7()[..prefix_len] == needle_suffix[..prefix_len] {
460 ambiguous.push(i);
461 }
462 }
463 }
464 }
465 (dict_results, ambiguous)
466 }
467
468 fn compare_with_shared_prefix(&self, needle: &[u8], op: &operator::Comparison) -> Option<bool> {
470 let shared_prefix_len = self.shared_prefix.len();
471
472 let needle_shared_len = std::cmp::min(needle.len(), shared_prefix_len);
473 let shared_cmp = self.shared_prefix[..needle_shared_len].cmp(&needle[..needle_shared_len]);
474 match (op, shared_cmp) {
475 (operator::Comparison::Lt | operator::Comparison::LtEq, std::cmp::Ordering::Less) => {
476 Some(true)
477 }
478 (
479 operator::Comparison::Lt | operator::Comparison::LtEq,
480 std::cmp::Ordering::Greater,
481 ) => Some(false),
482 (
483 operator::Comparison::Gt | operator::Comparison::GtEq,
484 std::cmp::Ordering::Greater,
485 ) => Some(true),
486 (operator::Comparison::Gt | operator::Comparison::GtEq, std::cmp::Ordering::Less) => {
487 Some(false)
488 }
489 (_, std::cmp::Ordering::Equal) => {
490 if needle.len() < shared_prefix_len {
491 match op {
492 operator::Comparison::Gt | operator::Comparison::GtEq => Some(true),
493 operator::Comparison::Lt => Some(false),
494 operator::Comparison::LtEq => Some(false),
495 }
496 } else {
497 None
498 }
499 }
500 }
501 }
502}
503
504fn compare_with_arrow_inner(
505 dict_array: DictionaryArray<UInt16Type>,
506 needle: &[u8],
507 op: &ByteViewOperator,
508) -> BooleanArray {
509 let needle_scalar = match dict_array.values().data_type() {
510 DataType::Utf8 => ScalarValue::Utf8(Some(
511 std::str::from_utf8(needle)
512 .expect("utf8 needle")
513 .to_string(),
514 )),
515 DataType::Utf8View => ScalarValue::Utf8View(Some(
516 std::str::from_utf8(needle)
517 .expect("utf8 needle")
518 .to_string(),
519 )),
520 DataType::LargeUtf8 => ScalarValue::LargeUtf8(Some(
521 std::str::from_utf8(needle)
522 .expect("utf8 needle")
523 .to_string(),
524 )),
525 DataType::Binary => ScalarValue::Binary(Some(needle.to_vec())),
526 DataType::BinaryView => ScalarValue::BinaryView(Some(needle.to_vec())),
527 DataType::LargeBinary => ScalarValue::LargeBinary(Some(needle.to_vec())),
528 _ => ScalarValue::Binary(Some(needle.to_vec())),
529 };
530 let lhs = ColumnarValue::Array(Arc::new(dict_array));
531 let rhs = ColumnarValue::Scalar(needle_scalar);
532 let op = Operator::from(op);
533 let result = apply_cmp(op, &lhs, &rhs);
534
535 match result.expect("ArrowError") {
536 ColumnarValue::Array(arr) => arr.as_boolean().clone(),
537 ColumnarValue::Scalar(_) => unreachable!(),
538 }
539}
540
541fn compress_needle(compressor: &Compressor, needle: &[u8]) -> Vec<u8> {
542 let mut compressed = Vec::with_capacity(needle.len().saturating_mul(2));
543 unsafe {
544 compressor.compress_into(needle, &mut compressed);
545 }
546 compressed
547}
548
549fn bytes_cmp_const<const N: usize>(left: &[u8; N], right: &[u8; N]) -> std::cmp::Ordering {
550 left.cmp(right)
551}
552
553fn bytes_cmp_short(left: &[u8], right: &[u8], len: usize) -> std::cmp::Ordering {
554 match len {
555 0 => std::cmp::Ordering::Equal,
556 1 => bytes_cmp_const::<1>(
557 &left[..1].try_into().unwrap(),
558 &right[..1].try_into().unwrap(),
559 ),
560 2 => bytes_cmp_const::<2>(
561 &left[..2].try_into().unwrap(),
562 &right[..2].try_into().unwrap(),
563 ),
564 3 => bytes_cmp_const::<3>(
565 &left[..3].try_into().unwrap(),
566 &right[..3].try_into().unwrap(),
567 ),
568 4 => bytes_cmp_const::<4>(
569 &left[..4].try_into().unwrap(),
570 &right[..4].try_into().unwrap(),
571 ),
572 5 => bytes_cmp_const::<5>(
573 &left[..5].try_into().unwrap(),
574 &right[..5].try_into().unwrap(),
575 ),
576 6 => bytes_cmp_const::<6>(
577 &left[..6].try_into().unwrap(),
578 &right[..6].try_into().unwrap(),
579 ),
580 7 => bytes_cmp_const::<7>(
581 &left[..7].try_into().unwrap(),
582 &right[..7].try_into().unwrap(),
583 ),
584 _ => left[..len].cmp(&right[..len]),
585 }
586}
587
588fn bytes_cmp_short_auto(left: &[u8], right: &[u8]) -> std::cmp::Ordering {
589 let len = left.len().min(right.len());
590 let ordering = bytes_cmp_short(left, right, len);
591 if ordering == std::cmp::Ordering::Equal {
592 left.len().cmp(&right.len())
593 } else {
594 ordering
595 }
596}
597
598fn compute_fingerprint_candidates(
601 needle: &[u8],
602 fingerprints: &Arc<[u32]>,
603) -> (Vec<bool>, Vec<usize>) {
604 let needle_fp = StringFingerprint::from_bytes(needle);
605 let dict_results = vec![false; fingerprints.len()];
606 let mut ambiguous = Vec::new();
607
608 for (index, &bits) in fingerprints.iter().enumerate() {
609 if StringFingerprint::from_bits(bits).might_contain(needle_fp) {
610 ambiguous.push(index);
611 }
612 }
613
614 (dict_results, ambiguous)
615}
616
617fn apply_like_match_on_candidates(
620 mut dict_results: Vec<bool>,
621 ambiguous: Vec<usize>,
622 values_buffer: arrow::buffer::Buffer,
623 offsets_buffer: arrow::buffer::OffsetBuffer<i32>,
624 needle: &[u8],
625 operator: operator::SubString,
626) -> Vec<bool> {
627 let values = unsafe { StringArray::new_unchecked(offsets_buffer, values_buffer, None) };
629 let pattern = std::str::from_utf8(needle).ok().unwrap();
630 let pattern = format!("%{}%", pattern);
631
632 let lhs = ColumnarValue::Array(Arc::new(values));
633 let rhs = ColumnarValue::Scalar(ScalarValue::Utf8(Some(pattern)));
634 let result = apply_cmp(Operator::LikeMatch, &lhs, &rhs).unwrap();
635 let result = result.into_array(ambiguous.len()).unwrap();
636 let matches = result.as_boolean();
637
638 for (pos, &dict_index) in ambiguous.iter().enumerate() {
639 if !matches.is_null(pos) && matches.value(pos) {
640 dict_results[dict_index] = true;
641 }
642 }
643
644 if operator == operator::SubString::NotContains {
645 for value in &mut dict_results {
646 *value = !*value;
647 }
648 }
649
650 dict_results
651}