1use arrow::array::{Array, DictionaryArray};
2use arrow::array::{BinaryArray, BooleanArray, BooleanBufferBuilder, StringArray, cast::AsArray};
3use arrow::datatypes::UInt16Type;
4use arrow_schema::DataType;
5use datafusion::common::ScalarValue;
6use datafusion::logical_expr::{ColumnarValue, Operator};
7use datafusion::physical_expr_common::datum::apply_cmp;
8use fsst::Compressor;
9use std::sync::Arc;
10use std::vec;
11
12use super::LiquidByteViewArray;
13use super::fingerprint::{StringFingerprint, substring_pattern_bytes};
14use crate::liquid_array::byte_view_array::operator::{self, ByteViewOperator};
15use crate::liquid_array::raw::FsstArray;
16use crate::liquid_array::raw::fsst_buffer::{DiskBuffer, FsstBacking, PrefixKey};
17
18impl LiquidByteViewArray<FsstArray> {
19 pub(super) fn compare_equals(&self, needle: &[u8]) -> BooleanArray {
21 let shared_prefix_len = self.shared_prefix.len();
22 let num_unique = self.prefix_keys.len();
23 if needle.len() < shared_prefix_len || needle[..shared_prefix_len] != self.shared_prefix {
24 return self.map_dictionary_results_to_array_results(vec![false; num_unique]);
25 }
26
27 let needle_suffix = &needle[shared_prefix_len..];
28 let needle_len = needle_suffix.len();
29 let prefix_len = PrefixKey::prefix_len();
30 let mut dict_results = vec![false; num_unique];
31
32 if needle_len <= prefix_len {
33 for (i, prefix_key) in self.prefix_keys.iter().enumerate().take(num_unique) {
34 let known_len = if prefix_key.len_byte() == 255 {
35 None
36 } else {
37 Some(prefix_key.len_byte() as usize)
38 };
39 if let Some(l) = known_len
40 && l == needle_len
41 && prefix_key.prefix7()[..l] == needle_suffix[..l]
42 {
43 dict_results[i] = true;
44 }
45 }
46
47 return self.map_dictionary_results_to_array_results(dict_results);
48 }
49
50 let compressed_needle = compress_needle(self.fsst_buffer.compressor(), needle);
51
52 for (i, prefix_key) in self.prefix_keys.iter().enumerate().take(num_unique) {
53 let known_len = if prefix_key.len_byte() == 255 {
54 None
55 } else {
56 Some(prefix_key.len_byte() as usize)
57 };
58
59 match known_len {
60 Some(l) => {
61 if l != needle_len {
62 continue;
63 }
64 }
65 None => {
66 if needle_len < 255 {
67 continue;
68 }
69 }
70 }
71
72 if prefix_key.prefix7()[..prefix_len] == needle_suffix[..prefix_len] {
73 let compressed_value = self.fsst_buffer.get_compressed_slice(i);
74 if compressed_value == compressed_needle.as_slice() {
75 dict_results[i] = true;
76 }
77 }
78 }
79
80 self.map_dictionary_results_to_array_results(dict_results)
81 }
82
83 fn compare_not_equals(&self, needle: &[u8]) -> BooleanArray {
85 let result = self.compare_equals(needle);
86 let (values, nulls) = result.into_parts();
87 let values = !&values;
88 BooleanArray::new(values, nulls)
89 }
90
91 pub fn compare_with(&self, needle: &[u8], op: &ByteViewOperator) -> BooleanArray {
93 match op {
94 ByteViewOperator::Comparison(cmp) => self.compare_with_inner(needle, cmp),
95 ByteViewOperator::Equality(operator::Equality::Eq) => self.compare_equals(needle),
96 ByteViewOperator::Equality(operator::Equality::NotEq) => {
97 self.compare_not_equals(needle)
98 }
99 ByteViewOperator::SubString(op) => {
100 if let Some(fingerprints) = self.string_fingerprints.as_ref() {
101 let pattern =
102 substring_pattern_bytes(needle).expect("Invalid substring pattern");
103 self.compare_like_substring(pattern, *op, fingerprints)
104 } else {
105 let fallback = ByteViewOperator::SubString(*op);
106 self.compare_with_arrow_fallback(needle, &fallback)
107 }
108 }
109 }
110 }
111
112 pub(super) fn compare_with_inner(
114 &self,
115 needle: &[u8],
116 op: &operator::Comparison,
117 ) -> BooleanArray {
118 let (mut dict_results, ambiguous) = self.compare_with_prefix(needle, op);
119
120 if !ambiguous.is_empty() {
122 let (values_buffer, offsets_buffer) =
123 self.fsst_buffer.to_uncompressed_selected(&ambiguous);
124 let binary_array =
125 unsafe { BinaryArray::new_unchecked(offsets_buffer, values_buffer, None) };
126
127 for (pos, &dict_index) in ambiguous.iter().enumerate() {
128 let value_cmp = binary_array.value(pos).cmp(needle);
129 let result = match (op, value_cmp) {
130 (operator::Comparison::Lt, std::cmp::Ordering::Less) => true,
131 (operator::Comparison::Lt, _) => false,
132 (
133 operator::Comparison::LtEq,
134 std::cmp::Ordering::Less | std::cmp::Ordering::Equal,
135 ) => true,
136 (operator::Comparison::LtEq, _) => false,
137 (operator::Comparison::Gt, std::cmp::Ordering::Greater) => true,
138 (operator::Comparison::Gt, _) => false,
139 (
140 operator::Comparison::GtEq,
141 std::cmp::Ordering::Greater | std::cmp::Ordering::Equal,
142 ) => true,
143 (operator::Comparison::GtEq, _) => false,
144 };
145 dict_results[dict_index] = result;
146 }
147 }
148
149 self.map_dictionary_results_to_array_results(dict_results)
150 }
151
152 fn compare_with_arrow_fallback(&self, needle: &[u8], op: &ByteViewOperator) -> BooleanArray {
154 let dict_array = self.to_dict_arrow();
155 compare_with_arrow_inner(dict_array, needle, op)
156 }
157
158 pub(super) fn compare_like_substring(
159 &self,
160 needle: &[u8],
161 operator: operator::SubString,
162 fingerprints: &Arc<[u32]>,
163 ) -> BooleanArray {
164 let (dict_results, ambiguous) = compute_fingerprint_candidates(needle, fingerprints);
165
166 let dict_results = if !ambiguous.is_empty() {
167 let (values_buffer, offsets_buffer) =
168 self.fsst_buffer.to_uncompressed_selected(&ambiguous);
169 apply_like_match_on_candidates(
170 dict_results,
171 ambiguous,
172 values_buffer,
173 offsets_buffer,
174 needle,
175 operator,
176 )
177 } else {
178 dict_results
179 };
180
181 self.map_dictionary_results_to_array_results(dict_results)
182 }
183}
184
185impl LiquidByteViewArray<DiskBuffer> {
186 pub(crate) async fn compare_with(&self, needle: &[u8], op: &ByteViewOperator) -> BooleanArray {
187 match op {
188 ByteViewOperator::Equality(operator::Equality::Eq) => self.compare_equals(needle).await,
189 ByteViewOperator::Equality(operator::Equality::NotEq) => {
190 self.compare_not_equals(needle).await
191 }
192 ByteViewOperator::Comparison(op) => self.compare_with_inner(needle, op).await,
193 ByteViewOperator::SubString(op) => {
194 let pattern = substring_pattern_bytes(needle).expect("Invalid substring pattern");
195 let fingerprints = self
196 .string_fingerprints
197 .as_ref()
198 .expect("Fingerprints not initialized");
199 self.compare_like_substring(pattern, *op, fingerprints)
200 .await
201 }
202 }
203 }
204
205 async fn compare_not_equals(&self, needle: &[u8]) -> BooleanArray {
207 let result = self.compare_equals(needle).await;
208 let (values, nulls) = result.into_parts();
209 let values = !&values;
210 BooleanArray::new(values, nulls)
211 }
212
213 pub(super) async fn compare_equals(&self, needle: &[u8]) -> BooleanArray {
215 let (mut dict_results, ambiguous) = self.compare_equals_with_prefix(needle);
216 if !ambiguous.is_empty() {
217 let bytes = self
218 .fsst_buffer
219 .squeeze_io()
220 .read(Some(self.fsst_buffer.disk_range()))
221 .await
222 .expect("read squeezed backing");
223 let hydrated = LiquidByteViewArray::<FsstArray>::from_bytes(
224 bytes,
225 self.fsst_buffer.compressor_arc(),
226 );
227 let compressed_needle = compress_needle(hydrated.fsst_buffer.compressor(), needle);
228
229 for &dict_index in ambiguous.iter() {
230 let compressed_value = hydrated.fsst_buffer.get_compressed_slice(dict_index);
231 if compressed_value == compressed_needle.as_slice() {
232 dict_results[dict_index] = true;
233 }
234 }
235 } else {
236 self.fsst_buffer.squeeze_io().trace_io_saved();
237 }
238
239 self.map_dictionary_results_to_array_results(dict_results)
240 }
241
242 pub(super) async fn compare_with_inner(
244 &self,
245 needle: &[u8],
246 op: &operator::Comparison,
247 ) -> BooleanArray {
248 let (mut dict_results, ambiguous) = self.compare_with_prefix(needle, op);
249
250 if !ambiguous.is_empty() {
252 let (values_buffer, offsets_buffer) =
253 self.fsst_buffer.to_uncompressed_selected(&ambiguous).await;
254 let binary_array =
255 unsafe { BinaryArray::new_unchecked(offsets_buffer, values_buffer, None) };
256
257 for (pos, &dict_index) in ambiguous.iter().enumerate() {
258 let value_cmp = bytes_cmp_short_auto(binary_array.value(pos), needle);
259 let result = match (op, value_cmp) {
260 (operator::Comparison::Lt, std::cmp::Ordering::Less) => true,
261 (operator::Comparison::Lt, _) => false,
262 (
263 operator::Comparison::LtEq,
264 std::cmp::Ordering::Less | std::cmp::Ordering::Equal,
265 ) => true,
266 (operator::Comparison::LtEq, _) => false,
267 (operator::Comparison::Gt, std::cmp::Ordering::Greater) => true,
268 (operator::Comparison::Gt, _) => false,
269 (
270 operator::Comparison::GtEq,
271 std::cmp::Ordering::Greater | std::cmp::Ordering::Equal,
272 ) => true,
273 (operator::Comparison::GtEq, _) => false,
274 };
275 dict_results[dict_index] = result;
276 }
277 } else {
278 self.fsst_buffer.squeeze_io().trace_io_saved();
279 }
280
281 self.map_dictionary_results_to_array_results(dict_results)
282 }
283
284 pub(super) async fn compare_like_substring(
285 &self,
286 needle: &[u8],
287 operator: operator::SubString,
288 fingerprints: &Arc<[u32]>,
289 ) -> BooleanArray {
290 let (dict_results, ambiguous) = compute_fingerprint_candidates(needle, fingerprints);
291
292 let dict_results = if !ambiguous.is_empty() {
293 let (values_buffer, offsets_buffer) =
294 self.fsst_buffer.to_uncompressed_selected(&ambiguous).await;
295 apply_like_match_on_candidates(
296 dict_results,
297 ambiguous,
298 values_buffer,
299 offsets_buffer,
300 needle,
301 operator,
302 )
303 } else {
304 self.fsst_buffer.squeeze_io().trace_io_saved();
305 dict_results
306 };
307
308 self.map_dictionary_results_to_array_results(dict_results)
309 }
310}
311
312impl<B: FsstBacking> LiquidByteViewArray<B> {
313 pub fn prefix_compare_counts(
315 &self,
316 needle: &[u8],
317 op: &operator::Comparison,
318 ) -> (usize, usize, usize) {
319 let (dict_results, ambiguous) = self.compare_with_prefix(needle, op);
320 let selected_rows = dict_results.iter().filter(|&x| *x).count();
321 (selected_rows, ambiguous.len(), self.dictionary_keys.len())
322 }
323
324 fn map_dictionary_results_to_array_results(&self, dict_results: Vec<bool>) -> BooleanArray {
325 let len = self.dictionary_keys.len();
326 let mut builder = BooleanBufferBuilder::new(len);
327 builder.advance(len);
328 for index in 0..len {
329 if !self.dictionary_keys.is_valid(index) {
330 continue;
331 }
332
333 let dict_index = self.dictionary_keys.value(index) as usize;
334 debug_assert!(dict_index < dict_results.len());
335 if dict_results.get(dict_index).copied().unwrap_or(false) {
336 builder.set_bit(index, true);
337 }
338 }
339
340 let values = builder.finish();
341 if let Some(nulls) = self.nulls() {
342 BooleanArray::new(values, Some(nulls.clone()))
343 } else {
344 BooleanArray::new(values, None)
345 }
346 }
347
348 #[inline(never)]
350 pub(super) fn compare_with_prefix(
351 &self,
352 needle: &[u8],
353 op: &operator::Comparison,
354 ) -> (Vec<bool>, Vec<usize>) {
355 if let Some(result) = self.compare_with_shared_prefix(needle, op) {
357 return (vec![result; self.dictionary_keys.len()], Vec::new());
358 }
359
360 let needle_suffix = &needle[self.shared_prefix.len()..];
361 let num_unique = self.prefix_keys.len();
362 let mut dict_results = vec![false; num_unique];
363 let mut ambiguous = Vec::new();
364
365 let cmp_len = needle_suffix.len().min(PrefixKey::prefix_len());
366 if cmp_len == 0 {
367 for (i, prefix_key) in self.prefix_keys.iter().enumerate() {
368 let is_empty_suffix = prefix_key.len_byte() == 0;
369 dict_results[i] = match op {
370 operator::Comparison::Lt => false,
371 operator::Comparison::LtEq => is_empty_suffix,
372 operator::Comparison::Gt => !is_empty_suffix,
373 operator::Comparison::GtEq => true,
374 };
375 }
376 return (dict_results, ambiguous);
377 }
378
379 for (i, prefix_key) in self.prefix_keys.iter().enumerate() {
380 let ordering = bytes_cmp_short(prefix_key.prefix7(), needle_suffix, cmp_len);
381 match ordering {
382 std::cmp::Ordering::Less => match op {
383 operator::Comparison::Lt | operator::Comparison::LtEq => {
384 dict_results[i] = true;
385 }
386 operator::Comparison::Gt | operator::Comparison::GtEq => {
387 dict_results[i] = false;
388 }
389 },
390 std::cmp::Ordering::Greater => match op {
391 operator::Comparison::Lt | operator::Comparison::LtEq => {
392 dict_results[i] = false;
393 }
394 operator::Comparison::Gt | operator::Comparison::GtEq => {
395 dict_results[i] = true;
396 }
397 },
398 std::cmp::Ordering::Equal => {
399 ambiguous.push(i);
400 }
401 }
402 }
403 (dict_results, ambiguous)
404 }
405
406 fn compare_equals_with_prefix(&self, needle: &[u8]) -> (Vec<bool>, Vec<usize>) {
408 let shared_prefix_len = self.shared_prefix.len();
409 let num_unique = self.prefix_keys.len();
410 if needle.len() < shared_prefix_len || needle[..shared_prefix_len] != self.shared_prefix {
411 return (vec![false; num_unique], Vec::new());
412 }
413
414 let needle_suffix = &needle[shared_prefix_len..];
415 let needle_len = needle_suffix.len();
416 let prefix_len = PrefixKey::prefix_len();
417
418 let mut dict_results = vec![false; num_unique];
419 let mut ambiguous = Vec::new();
420
421 for (i, prefix_key) in self.prefix_keys.iter().enumerate().take(num_unique) {
422 let known_len = if prefix_key.len_byte() == 255 {
423 None
424 } else {
425 Some(prefix_key.len_byte() as usize)
426 };
427
428 match known_len {
430 Some(l) => {
431 if l != needle_len {
432 continue;
433 }
434 }
435 None => {
436 if needle_len < 255 {
437 continue;
438 }
439 }
440 }
441
442 match known_len {
444 None => {
445 if prefix_key.prefix7()[..prefix_len] == needle_suffix[..prefix_len] {
447 ambiguous.push(i);
448 }
449 }
450 Some(l) if l <= prefix_len => {
451 if prefix_key.prefix7()[..l] == needle_suffix[..l] {
453 dict_results[i] = true;
454 }
455 }
456 Some(_l) => {
457 if prefix_key.prefix7()[..prefix_len] == needle_suffix[..prefix_len] {
459 ambiguous.push(i);
460 }
461 }
462 }
463 }
464 (dict_results, ambiguous)
465 }
466
467 fn compare_with_shared_prefix(&self, needle: &[u8], op: &operator::Comparison) -> Option<bool> {
469 let shared_prefix_len = self.shared_prefix.len();
470
471 let needle_shared_len = std::cmp::min(needle.len(), shared_prefix_len);
472 let shared_cmp = self.shared_prefix[..needle_shared_len].cmp(&needle[..needle_shared_len]);
473 match (op, shared_cmp) {
474 (operator::Comparison::Lt | operator::Comparison::LtEq, std::cmp::Ordering::Less) => {
475 Some(true)
476 }
477 (
478 operator::Comparison::Lt | operator::Comparison::LtEq,
479 std::cmp::Ordering::Greater,
480 ) => Some(false),
481 (
482 operator::Comparison::Gt | operator::Comparison::GtEq,
483 std::cmp::Ordering::Greater,
484 ) => Some(true),
485 (operator::Comparison::Gt | operator::Comparison::GtEq, std::cmp::Ordering::Less) => {
486 Some(false)
487 }
488 (_, std::cmp::Ordering::Equal) => {
489 if needle.len() < shared_prefix_len {
490 match op {
491 operator::Comparison::Gt | operator::Comparison::GtEq => Some(true),
492 operator::Comparison::Lt => Some(false),
493 operator::Comparison::LtEq => Some(false),
494 }
495 } else {
496 None
497 }
498 }
499 }
500 }
501}
502
503fn compare_with_arrow_inner(
504 dict_array: DictionaryArray<UInt16Type>,
505 needle: &[u8],
506 op: &ByteViewOperator,
507) -> BooleanArray {
508 let needle_scalar = match dict_array.values().data_type() {
509 DataType::Utf8 => ScalarValue::Utf8(Some(
510 std::str::from_utf8(needle)
511 .expect("utf8 needle")
512 .to_string(),
513 )),
514 DataType::Utf8View => ScalarValue::Utf8View(Some(
515 std::str::from_utf8(needle)
516 .expect("utf8 needle")
517 .to_string(),
518 )),
519 DataType::LargeUtf8 => ScalarValue::LargeUtf8(Some(
520 std::str::from_utf8(needle)
521 .expect("utf8 needle")
522 .to_string(),
523 )),
524 DataType::Binary => ScalarValue::Binary(Some(needle.to_vec())),
525 DataType::BinaryView => ScalarValue::BinaryView(Some(needle.to_vec())),
526 DataType::LargeBinary => ScalarValue::LargeBinary(Some(needle.to_vec())),
527 _ => ScalarValue::Binary(Some(needle.to_vec())),
528 };
529 let lhs = ColumnarValue::Array(Arc::new(dict_array));
530 let rhs = ColumnarValue::Scalar(needle_scalar);
531 let op = Operator::from(op);
532 let result = apply_cmp(op, &lhs, &rhs);
533
534 match result.expect("ArrowError") {
535 ColumnarValue::Array(arr) => arr.as_boolean().clone(),
536 ColumnarValue::Scalar(_) => unreachable!(),
537 }
538}
539
540fn compress_needle(compressor: &Compressor, needle: &[u8]) -> Vec<u8> {
541 let mut compressed = Vec::with_capacity(needle.len().saturating_mul(2));
542 unsafe {
543 compressor.compress_into(needle, &mut compressed);
544 }
545 compressed
546}
547
548fn bytes_cmp_const<const N: usize>(left: &[u8; N], right: &[u8; N]) -> std::cmp::Ordering {
549 left.cmp(right)
550}
551
552fn bytes_cmp_short(left: &[u8], right: &[u8], len: usize) -> std::cmp::Ordering {
553 match len {
554 0 => std::cmp::Ordering::Equal,
555 1 => bytes_cmp_const::<1>(
556 &left[..1].try_into().unwrap(),
557 &right[..1].try_into().unwrap(),
558 ),
559 2 => bytes_cmp_const::<2>(
560 &left[..2].try_into().unwrap(),
561 &right[..2].try_into().unwrap(),
562 ),
563 3 => bytes_cmp_const::<3>(
564 &left[..3].try_into().unwrap(),
565 &right[..3].try_into().unwrap(),
566 ),
567 4 => bytes_cmp_const::<4>(
568 &left[..4].try_into().unwrap(),
569 &right[..4].try_into().unwrap(),
570 ),
571 5 => bytes_cmp_const::<5>(
572 &left[..5].try_into().unwrap(),
573 &right[..5].try_into().unwrap(),
574 ),
575 6 => bytes_cmp_const::<6>(
576 &left[..6].try_into().unwrap(),
577 &right[..6].try_into().unwrap(),
578 ),
579 7 => bytes_cmp_const::<7>(
580 &left[..7].try_into().unwrap(),
581 &right[..7].try_into().unwrap(),
582 ),
583 _ => left[..len].cmp(&right[..len]),
584 }
585}
586
587fn bytes_cmp_short_auto(left: &[u8], right: &[u8]) -> std::cmp::Ordering {
588 let len = left.len().min(right.len());
589 let ordering = bytes_cmp_short(left, right, len);
590 if ordering == std::cmp::Ordering::Equal {
591 left.len().cmp(&right.len())
592 } else {
593 ordering
594 }
595}
596
597fn compute_fingerprint_candidates(
600 needle: &[u8],
601 fingerprints: &Arc<[u32]>,
602) -> (Vec<bool>, Vec<usize>) {
603 let needle_fp = StringFingerprint::from_bytes(needle);
604 let dict_results = vec![false; fingerprints.len()];
605 let mut ambiguous = Vec::new();
606
607 for (index, &bits) in fingerprints.iter().enumerate() {
608 if StringFingerprint::from_bits(bits).might_contain(needle_fp) {
609 ambiguous.push(index);
610 }
611 }
612
613 (dict_results, ambiguous)
614}
615
616fn apply_like_match_on_candidates(
619 mut dict_results: Vec<bool>,
620 ambiguous: Vec<usize>,
621 values_buffer: arrow::buffer::Buffer,
622 offsets_buffer: arrow::buffer::OffsetBuffer<i32>,
623 needle: &[u8],
624 operator: operator::SubString,
625) -> Vec<bool> {
626 let values = unsafe { StringArray::new_unchecked(offsets_buffer, values_buffer, None) };
628 let pattern = std::str::from_utf8(needle).ok().unwrap();
629 let pattern = format!("%{}%", pattern);
630
631 let lhs = ColumnarValue::Array(Arc::new(values));
632 let rhs = ColumnarValue::Scalar(ScalarValue::Utf8(Some(pattern)));
633 let result = apply_cmp(Operator::LikeMatch, &lhs, &rhs).unwrap();
634 let result = result.into_array(ambiguous.len()).unwrap();
635 let matches = result.as_boolean();
636
637 for (pos, &dict_index) in ambiguous.iter().enumerate() {
638 if !matches.is_null(pos) && matches.value(pos) {
639 dict_results[dict_index] = true;
640 }
641 }
642
643 if operator == operator::SubString::NotContains {
644 for value in &mut dict_results {
645 *value = !*value;
646 }
647 }
648
649 dict_results
650}