1use arrow::array::{
21 Array, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder, Datum, Scalar,
22 StringArrayType,
23};
24use arrow::buffer::{BooleanBuffer, NullBuffer};
25use arrow::datatypes::DataType;
26use arrow::row::{RowConverter, Rows, SortField};
27use datafusion_common::cast::{as_fixed_size_list_array, as_generic_list_array};
28use datafusion_common::utils::string_utils::string_array_to_vec;
29use datafusion_common::utils::take_function_args;
30use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err};
31use datafusion_expr::expr::ScalarFunction;
32use datafusion_expr::simplify::ExprSimplifyResult;
33use datafusion_expr::{
34 ColumnarValue, Documentation, Expr, ScalarFunctionArgs, ScalarUDFImpl, Signature,
35 Volatility, in_list,
36};
37use datafusion_macros::user_doc;
38use datafusion_physical_expr_common::datum::compare_with_eq;
39use itertools::Itertools;
40
41use crate::make_array::make_array_udf;
42use crate::utils::make_scalar_function;
43
44use hashbrown::HashSet;
45use std::ops::Range;
46use std::sync::Arc;
47
48make_udf_expr_and_func!(ArrayHas,
50 array_has,
51 haystack_array element, "returns true, if the element appears in the first array, otherwise false.", array_has_udf );
55make_udf_expr_and_func!(ArrayHasAll,
56 array_has_all,
57 haystack_array needle_array, "returns true if each element of the second array appears in the first array; otherwise, it returns false.", array_has_all_udf );
61make_udf_expr_and_func!(ArrayHasAny,
62 array_has_any,
63 first_array second_array, "returns true if at least one element of the second array appears in the first array; otherwise, it returns false.", array_has_any_udf );
67
68#[user_doc(
69 doc_section(label = "Array Functions"),
70 description = "Returns true if the array contains the element.",
71 syntax_example = "array_has(array, element)",
72 sql_example = r#"```sql
73> select array_has([1, 2, 3], 2);
74+-----------------------------+
75| array_has(List([1,2,3]), 2) |
76+-----------------------------+
77| true |
78+-----------------------------+
79```"#,
80 argument(
81 name = "array",
82 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
83 ),
84 argument(
85 name = "element",
86 description = "Scalar or Array expression. Can be a constant, column, or function, and any combination of array operators."
87 )
88)]
89#[derive(Debug, PartialEq, Eq, Hash)]
90pub struct ArrayHas {
91 signature: Signature,
92 aliases: Vec<String>,
93}
94
95impl Default for ArrayHas {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101impl ArrayHas {
102 pub fn new() -> Self {
103 Self {
104 signature: Signature::array_and_element(Volatility::Immutable),
105 aliases: vec![
106 String::from("list_has"),
107 String::from("array_contains"),
108 String::from("list_contains"),
109 ],
110 }
111 }
112}
113
114impl ScalarUDFImpl for ArrayHas {
115 fn name(&self) -> &str {
116 "array_has"
117 }
118
119 fn signature(&self) -> &Signature {
120 &self.signature
121 }
122
123 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
124 Ok(DataType::Boolean)
125 }
126
127 fn simplify(
128 &self,
129 mut args: Vec<Expr>,
130 _info: &datafusion_expr::simplify::SimplifyContext,
131 ) -> Result<ExprSimplifyResult> {
132 let [haystack, needle] = take_function_args(self.name(), &mut args)?;
133
134 match haystack {
137 Expr::Literal(scalar, _) if scalar.is_null() => {
138 return Ok(ExprSimplifyResult::Simplified(Expr::Literal(
139 ScalarValue::Boolean(None),
140 None,
141 )));
142 }
143 Expr::Literal(
144 scalar @ ScalarValue::List(_) | scalar @ ScalarValue::LargeList(_),
146 _,
147 ) => {
148 if let Ok(scalar_values) =
149 ScalarValue::convert_array_to_scalar_vec(&scalar.to_array()?)
150 {
151 assert_eq!(scalar_values.len(), 1);
152 let list = scalar_values
153 .into_iter()
154 .flatten()
155 .flatten()
156 .map(|v| Expr::Literal(v, None))
157 .collect();
158
159 return Ok(ExprSimplifyResult::Simplified(in_list(
160 std::mem::take(needle),
161 list,
162 false,
163 )));
164 }
165 }
166 Expr::ScalarFunction(ScalarFunction { func, args })
167 if func == &make_array_udf() =>
168 {
169 return Ok(ExprSimplifyResult::Simplified(in_list(
171 std::mem::take(needle),
172 std::mem::take(args),
173 false,
174 )));
175 }
176 _ => {}
177 };
178 Ok(ExprSimplifyResult::Original(args))
179 }
180
181 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
182 let [first_arg, second_arg] = take_function_args(self.name(), &args.args)?;
183 if first_arg.data_type().is_null() {
184 return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
187 }
188
189 match &second_arg {
190 ColumnarValue::Array(array_needle) => {
191 let haystack = first_arg.to_array(array_needle.len())?;
193 let array = array_has_inner_for_array(&haystack, array_needle)?;
194 Ok(ColumnarValue::Array(array))
195 }
196 ColumnarValue::Scalar(scalar_needle) => {
197 if scalar_needle.is_null() {
200 return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
201 }
202
203 let haystack = first_arg.to_array(1)?;
205 let needle = scalar_needle.to_array_of_size(1)?;
206 let needle = Scalar::new(needle);
207 let array = array_has_inner_for_scalar(&haystack, &needle)?;
208 if let ColumnarValue::Scalar(_) = &first_arg {
209 let scalar_value = ScalarValue::try_from_array(&array, 0)?;
211 Ok(ColumnarValue::Scalar(scalar_value))
212 } else {
213 Ok(ColumnarValue::Array(array))
214 }
215 }
216 }
217 }
218
219 fn aliases(&self) -> &[String] {
220 &self.aliases
221 }
222
223 fn documentation(&self) -> Option<&Documentation> {
224 self.doc()
225 }
226}
227
228fn array_has_inner_for_scalar(
229 haystack: &ArrayRef,
230 needle: &dyn Datum,
231) -> Result<ArrayRef> {
232 let haystack = haystack.as_ref().try_into()?;
233 array_has_dispatch_for_scalar(haystack, needle)
234}
235
236fn array_has_inner_for_array(haystack: &ArrayRef, needle: &ArrayRef) -> Result<ArrayRef> {
237 let haystack = haystack.as_ref().try_into()?;
238 array_has_dispatch_for_array(haystack, needle)
239}
240
241#[derive(Copy, Clone)]
242enum ArrayWrapper<'a> {
243 FixedSizeList(&'a arrow::array::FixedSizeListArray),
244 List(&'a arrow::array::GenericListArray<i32>),
245 LargeList(&'a arrow::array::GenericListArray<i64>),
246}
247
248impl<'a> TryFrom<&'a dyn Array> for ArrayWrapper<'a> {
249 type Error = DataFusionError;
250
251 fn try_from(
252 value: &'a dyn Array,
253 ) -> std::result::Result<ArrayWrapper<'a>, Self::Error> {
254 match value.data_type() {
255 DataType::List(_) => {
256 Ok(ArrayWrapper::List(as_generic_list_array::<i32>(value)?))
257 }
258 DataType::LargeList(_) => Ok(ArrayWrapper::LargeList(
259 as_generic_list_array::<i64>(value)?,
260 )),
261 DataType::FixedSizeList(_, _) => Ok(ArrayWrapper::FixedSizeList(
262 as_fixed_size_list_array(value)?,
263 )),
264 _ => exec_err!("array_has does not support type '{}'.", value.data_type()),
265 }
266 }
267}
268
269impl<'a> ArrayWrapper<'a> {
270 fn len(&self) -> usize {
271 match self {
272 ArrayWrapper::FixedSizeList(arr) => arr.len(),
273 ArrayWrapper::List(arr) => arr.len(),
274 ArrayWrapper::LargeList(arr) => arr.len(),
275 }
276 }
277
278 fn iter(&self) -> Box<dyn Iterator<Item = Option<ArrayRef>> + 'a> {
279 match self {
280 ArrayWrapper::FixedSizeList(arr) => Box::new(arr.iter()),
281 ArrayWrapper::List(arr) => Box::new(arr.iter()),
282 ArrayWrapper::LargeList(arr) => Box::new(arr.iter()),
283 }
284 }
285
286 fn values(&self) -> &ArrayRef {
287 match self {
288 ArrayWrapper::FixedSizeList(arr) => arr.values(),
289 ArrayWrapper::List(arr) => arr.values(),
290 ArrayWrapper::LargeList(arr) => arr.values(),
291 }
292 }
293
294 fn value_type(&self) -> DataType {
295 match self {
296 ArrayWrapper::FixedSizeList(arr) => arr.value_type(),
297 ArrayWrapper::List(arr) => arr.value_type(),
298 ArrayWrapper::LargeList(arr) => arr.value_type(),
299 }
300 }
301
302 fn offsets(&self) -> Box<dyn Iterator<Item = usize> + 'a> {
303 match self {
304 ArrayWrapper::FixedSizeList(arr) => {
305 let value_length = arr.value_length() as usize;
306 Box::new((0..=arr.len()).map(move |i| i * value_length))
307 }
308 ArrayWrapper::List(arr) => {
309 Box::new(arr.offsets().iter().map(|o| (*o) as usize))
310 }
311 ArrayWrapper::LargeList(arr) => {
312 Box::new(arr.offsets().iter().map(|o| (*o) as usize))
313 }
314 }
315 }
316
317 fn nulls(&self) -> Option<&NullBuffer> {
318 match self {
319 ArrayWrapper::FixedSizeList(arr) => arr.nulls(),
320 ArrayWrapper::List(arr) => arr.nulls(),
321 ArrayWrapper::LargeList(arr) => arr.nulls(),
322 }
323 }
324}
325
326fn array_has_dispatch_for_array<'a>(
327 haystack: ArrayWrapper<'a>,
328 needle: &ArrayRef,
329) -> Result<ArrayRef> {
330 let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
331 let mut result = BooleanBufferBuilder::new(haystack.len());
332 for (i, arr) in haystack.iter().enumerate() {
333 if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
334 result.append(false);
335 continue;
336 }
337 let arr = arr.unwrap();
338 let is_nested = arr.data_type().is_nested();
339 let needle_row = Scalar::new(needle.slice(i, 1));
340 let eq_array = compare_with_eq(&arr, &needle_row, is_nested)?;
341 result.append(eq_array.has_true());
342 }
343
344 Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
345}
346
347fn array_has_dispatch_for_scalar(
348 haystack: ArrayWrapper<'_>,
349 needle: &dyn Datum,
350) -> Result<ArrayRef> {
351 if haystack.len() == 0 {
354 return Ok(Arc::new(BooleanArray::new(
355 BooleanBuffer::new_unset(haystack.len()),
356 None,
357 )));
358 }
359
360 let offsets: Vec<usize> = haystack.offsets().collect();
363 let first_offset = offsets[0];
364 let visible_values = haystack
365 .values()
366 .slice(first_offset, offsets[offsets.len() - 1] - first_offset);
367
368 let is_nested = visible_values.data_type().is_nested();
369 let eq_array = compare_with_eq(&visible_values, needle, is_nested)?;
370
371 let eq_bits = match eq_array.nulls() {
379 Some(nulls) => eq_array.values() & nulls.inner(),
380 None => eq_array.values().clone(),
381 };
382
383 let validity = match &haystack {
384 ArrayWrapper::FixedSizeList(arr) => arr.nulls(),
385 ArrayWrapper::List(arr) => arr.nulls(),
386 ArrayWrapper::LargeList(arr) => arr.nulls(),
387 };
388 let mut matches = eq_bits.set_indices().peekable();
389 let mut result = BooleanBufferBuilder::new(haystack.len());
390 result.append_n(haystack.len(), false);
391
392 for (i, window) in offsets.windows(2).enumerate() {
395 let end = window[1] - first_offset;
396
397 let has_match = matches.peek().is_some_and(|&p| p < end);
398
399 while matches.peek().is_some_and(|&p| p < end) {
401 matches.next();
402 }
403
404 if has_match && validity.is_none_or(|v| v.is_valid(i)) {
405 result.set_bit(i, true);
406 }
407 }
408
409 Ok(Arc::new(BooleanArray::new(
412 result.finish(),
413 validity.cloned(),
414 )))
415}
416
417fn array_has_all_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
418 array_has_all_and_any_inner(args, ComparisonType::All)
419}
420
421const ROW_CONVERSION_CHUNK_SIZE: usize = 512;
426
427fn general_array_has_for_all_and_any<'a>(
429 haystack: ArrayWrapper<'a>,
430 needle: ArrayWrapper<'a>,
431 comparison_type: ComparisonType,
432) -> Result<ArrayRef> {
433 let num_rows = haystack.len();
434 let converter = RowConverter::new(vec![SortField::new(haystack.value_type())])?;
435
436 let h_offsets: Vec<usize> = haystack.offsets().collect();
437 let n_offsets: Vec<usize> = needle.offsets().collect();
438
439 let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
440 let mut result = BooleanBufferBuilder::new(num_rows);
441
442 for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
443 let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
444
445 let h_elem_start = h_offsets[chunk_start];
448 let h_elem_end = h_offsets[chunk_end];
449 let n_elem_start = n_offsets[chunk_start];
450 let n_elem_end = n_offsets[chunk_end];
451
452 let h_vals = haystack
453 .values()
454 .slice(h_elem_start, h_elem_end - h_elem_start);
455 let n_vals = needle
456 .values()
457 .slice(n_elem_start, n_elem_end - n_elem_start);
458
459 let chunk_h_rows = converter.convert_columns(&[h_vals])?;
460 let chunk_n_rows = converter.convert_columns(&[n_vals])?;
461
462 for i in chunk_start..chunk_end {
463 if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
464 result.append(false);
465 continue;
466 }
467 result.append(general_array_has_all_and_any_kernel(
468 &chunk_h_rows,
469 (h_offsets[i] - h_elem_start)..(h_offsets[i + 1] - h_elem_start),
470 &chunk_n_rows,
471 (n_offsets[i] - n_elem_start)..(n_offsets[i + 1] - n_elem_start),
472 comparison_type,
473 ));
474 }
475 }
476
477 Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
478}
479
480fn array_has_all_and_any_string_internal<'a>(
482 haystack: ArrayWrapper<'a>,
483 needle: ArrayWrapper<'a>,
484 comparison_type: ComparisonType,
485) -> Result<ArrayRef> {
486 let num_rows = haystack.len();
487
488 let h_offsets: Vec<usize> = haystack.offsets().collect();
489 let n_offsets: Vec<usize> = needle.offsets().collect();
490
491 let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
492 let mut result = BooleanBufferBuilder::new(num_rows);
493
494 for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
495 let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
496
497 let h_elem_start = h_offsets[chunk_start];
498 let h_elem_end = h_offsets[chunk_end];
499 let n_elem_start = n_offsets[chunk_start];
500 let n_elem_end = n_offsets[chunk_end];
501
502 let h_vals = haystack
503 .values()
504 .slice(h_elem_start, h_elem_end - h_elem_start);
505 let n_vals = needle
506 .values()
507 .slice(n_elem_start, n_elem_end - n_elem_start);
508
509 let chunk_h_strings = string_array_to_vec(h_vals.as_ref());
510 let chunk_n_strings = string_array_to_vec(n_vals.as_ref());
511
512 for i in chunk_start..chunk_end {
513 if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
514 result.append(false);
515 continue;
516 }
517 let h_start = h_offsets[i] - h_elem_start;
518 let h_end = h_offsets[i + 1] - h_elem_start;
519 let n_start = n_offsets[i] - n_elem_start;
520 let n_end = n_offsets[i + 1] - n_elem_start;
521 result.append(array_has_string_kernel(
522 &chunk_h_strings[h_start..h_end],
523 &chunk_n_strings[n_start..n_end],
524 comparison_type,
525 ));
526 }
527 }
528
529 Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
530}
531
532fn array_has_all_and_any_dispatch<'a>(
533 haystack: ArrayWrapper<'a>,
534 needle: ArrayWrapper<'a>,
535 comparison_type: ComparisonType,
536) -> Result<ArrayRef> {
537 if needle.values().is_empty() {
538 let buffer = match comparison_type {
539 ComparisonType::All => BooleanBuffer::new_set(haystack.len()),
540 ComparisonType::Any => BooleanBuffer::new_unset(haystack.len()),
541 };
542 Ok(Arc::new(BooleanArray::from(buffer)))
543 } else {
544 match needle.value_type() {
545 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
546 array_has_all_and_any_string_internal(haystack, needle, comparison_type)
547 }
548 _ => general_array_has_for_all_and_any(haystack, needle, comparison_type),
549 }
550 }
551}
552
553fn array_has_all_and_any_inner(
554 args: &[ArrayRef],
555 comparison_type: ComparisonType,
556) -> Result<ArrayRef> {
557 let haystack: ArrayWrapper = args[0].as_ref().try_into()?;
558 let needle: ArrayWrapper = args[1].as_ref().try_into()?;
559 array_has_all_and_any_dispatch(haystack, needle, comparison_type)
560}
561
562fn array_has_any_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
563 array_has_all_and_any_inner(args, ComparisonType::Any)
564}
565
566fn array_has_any_with_scalar(
568 columnar_arg: &ColumnarValue,
569 scalar_arg: &ScalarValue,
570) -> Result<ColumnarValue> {
571 if scalar_arg.is_null() {
572 return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
573 }
574
575 let scalar_array = scalar_arg.to_array_of_size(1)?;
577 let scalar_list: ArrayWrapper = scalar_array.as_ref().try_into()?;
578 let offsets: Vec<usize> = scalar_list.offsets().collect();
579 let scalar_values = scalar_list
580 .values()
581 .slice(offsets[0], offsets[1] - offsets[0]);
582
583 if scalar_values.is_empty() {
585 return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))));
586 }
587
588 match scalar_values.data_type() {
589 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
590 array_has_any_with_scalar_string(columnar_arg, &scalar_values)
591 }
592 _ => array_has_any_with_scalar_general(columnar_arg, &scalar_values),
593 }
594}
595
596const SCALAR_SMALL_THRESHOLD: usize = 8;
601
602fn array_has_any_with_scalar_string(
604 columnar_arg: &ColumnarValue,
605 scalar_values: &ArrayRef,
606) -> Result<ColumnarValue> {
607 let (col_arr, is_scalar_output) = match columnar_arg {
608 ColumnarValue::Array(arr) => (Arc::clone(arr), false),
609 ColumnarValue::Scalar(s) => (s.to_array_of_size(1)?, true),
610 };
611
612 let col_list: ArrayWrapper = col_arr.as_ref().try_into()?;
613 let col_values = col_list.values();
614 let col_offsets: Vec<usize> = col_list.offsets().collect();
615 let col_nulls = col_list.nulls();
616
617 let scalar_lookup = ScalarStringLookup::new(scalar_values);
618 let has_null_scalar = scalar_values.null_count() > 0;
619
620 let result = match col_values.data_type() {
621 DataType::Utf8 => array_has_any_string_inner(
622 col_values.as_string::<i32>(),
623 &col_offsets,
624 col_nulls,
625 has_null_scalar,
626 &scalar_lookup,
627 ),
628 DataType::LargeUtf8 => array_has_any_string_inner(
629 col_values.as_string::<i64>(),
630 &col_offsets,
631 col_nulls,
632 has_null_scalar,
633 &scalar_lookup,
634 ),
635 DataType::Utf8View => array_has_any_string_inner(
636 col_values.as_string_view(),
637 &col_offsets,
638 col_nulls,
639 has_null_scalar,
640 &scalar_lookup,
641 ),
642 _ => unreachable!("array_has_any_with_scalar_string called with non-string type"),
643 };
644
645 if is_scalar_output {
646 Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
647 &result, 0,
648 )?))
649 } else {
650 Ok(ColumnarValue::Array(result))
651 }
652}
653
654enum ScalarStringLookup<'a> {
656 Set(HashSet<&'a str>),
658 List(Vec<Option<&'a str>>),
660}
661
662impl<'a> ScalarStringLookup<'a> {
663 fn new(scalar_values: &'a ArrayRef) -> Self {
664 let strings = string_array_to_vec(scalar_values.as_ref());
665 if strings.len() > SCALAR_SMALL_THRESHOLD {
666 ScalarStringLookup::Set(strings.into_iter().flatten().collect())
667 } else {
668 ScalarStringLookup::List(strings)
669 }
670 }
671
672 fn contains(&self, value: &str) -> bool {
673 match self {
674 ScalarStringLookup::Set(set) => set.contains(value),
675 ScalarStringLookup::List(list) => list.contains(&Some(value)),
676 }
677 }
678}
679
680fn array_has_any_string_inner<'a, C: StringArrayType<'a> + Copy>(
683 col_strings: C,
684 col_offsets: &[usize],
685 col_nulls: Option<&NullBuffer>,
686 has_null_scalar: bool,
687 scalar_lookup: &ScalarStringLookup<'_>,
688) -> ArrayRef {
689 let num_rows = col_offsets.len() - 1;
690 let mut result = BooleanBufferBuilder::new(num_rows);
691
692 for i in 0..num_rows {
693 if col_nulls.is_some_and(|v| v.is_null(i)) {
694 result.append(false);
695 continue;
696 }
697 let start = col_offsets[i];
698 let end = col_offsets[i + 1];
699 let found = (start..end).any(|j| {
700 if col_strings.is_null(j) {
701 has_null_scalar
702 } else {
703 scalar_lookup.contains(col_strings.value(j))
704 }
705 });
706 result.append(found);
707 }
708
709 Arc::new(BooleanArray::new(result.finish(), col_nulls.cloned()))
710}
711
712fn array_has_any_with_scalar_general(
715 columnar_arg: &ColumnarValue,
716 scalar_values: &ArrayRef,
717) -> Result<ColumnarValue> {
718 let converter =
719 RowConverter::new(vec![SortField::new(scalar_values.data_type().clone())])?;
720 let scalar_rows = converter.convert_columns(&[Arc::clone(scalar_values)])?;
721
722 let (col_arr, is_scalar_output) = match columnar_arg {
723 ColumnarValue::Array(arr) => (Arc::clone(arr), false),
724 ColumnarValue::Scalar(s) => (s.to_array_of_size(1)?, true),
725 };
726
727 let col_list: ArrayWrapper = col_arr.as_ref().try_into()?;
728 let col_rows = converter.convert_columns(&[Arc::clone(col_list.values())])?;
729 let col_offsets: Vec<usize> = col_list.offsets().collect();
730 let col_nulls = col_list.nulls();
731
732 let mut result = BooleanBufferBuilder::new(col_list.len());
733 let num_scalar = scalar_rows.num_rows();
734
735 if num_scalar > SCALAR_SMALL_THRESHOLD {
736 let scalar_set: HashSet<Box<[u8]>> = (0..num_scalar)
738 .map(|i| Box::from(scalar_rows.row(i).as_ref()))
739 .collect();
740
741 for i in 0..col_list.len() {
742 if col_nulls.is_some_and(|v| v.is_null(i)) {
743 result.append(false);
744 continue;
745 }
746 let start = col_offsets[i];
747 let end = col_offsets[i + 1];
748 let found =
749 (start..end).any(|j| scalar_set.contains(col_rows.row(j).as_ref()));
750 result.append(found);
751 }
752 } else {
753 for i in 0..col_list.len() {
755 if col_nulls.is_some_and(|v| v.is_null(i)) {
756 result.append(false);
757 continue;
758 }
759 let start = col_offsets[i];
760 let end = col_offsets[i + 1];
761 let found = (start..end)
762 .any(|j| (0..num_scalar).any(|k| col_rows.row(j) == scalar_rows.row(k)));
763 result.append(found);
764 }
765 }
766
767 let output: ArrayRef =
768 Arc::new(BooleanArray::new(result.finish(), col_nulls.cloned()));
769
770 if is_scalar_output {
771 Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
772 &output, 0,
773 )?))
774 } else {
775 Ok(ColumnarValue::Array(output))
776 }
777}
778
779#[user_doc(
780 doc_section(label = "Array Functions"),
781 description = "Returns true if all elements of sub-array exist in array.",
782 syntax_example = "array_has_all(array, sub-array)",
783 sql_example = r#"```sql
784> select array_has_all([1, 2, 3, 4], [2, 3]);
785+--------------------------------------------+
786| array_has_all(List([1,2,3,4]), List([2,3])) |
787+--------------------------------------------+
788| true |
789+--------------------------------------------+
790```"#,
791 argument(
792 name = "array",
793 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
794 ),
795 argument(
796 name = "sub-array",
797 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
798 )
799)]
800#[derive(Debug, PartialEq, Eq, Hash)]
801pub struct ArrayHasAll {
802 signature: Signature,
803 aliases: Vec<String>,
804}
805
806impl Default for ArrayHasAll {
807 fn default() -> Self {
808 Self::new()
809 }
810}
811
812impl ArrayHasAll {
813 pub fn new() -> Self {
814 Self {
815 signature: Signature::arrays(2, None, Volatility::Immutable),
816 aliases: vec![String::from("list_has_all")],
817 }
818 }
819}
820
821impl ScalarUDFImpl for ArrayHasAll {
822 fn name(&self) -> &str {
823 "array_has_all"
824 }
825
826 fn signature(&self) -> &Signature {
827 &self.signature
828 }
829
830 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
831 Ok(DataType::Boolean)
832 }
833
834 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
835 make_scalar_function(array_has_all_inner)(&args.args)
836 }
837
838 fn aliases(&self) -> &[String] {
839 &self.aliases
840 }
841
842 fn documentation(&self) -> Option<&Documentation> {
843 self.doc()
844 }
845}
846
847#[user_doc(
848 doc_section(label = "Array Functions"),
849 description = "Returns true if the arrays have any elements in common.",
850 syntax_example = "array_has_any(array1, array2)",
851 sql_example = r#"```sql
852> select array_has_any([1, 2, 3], [3, 4]);
853+------------------------------------------+
854| array_has_any(List([1,2,3]), List([3,4])) |
855+------------------------------------------+
856| true |
857+------------------------------------------+
858```"#,
859 argument(
860 name = "array1",
861 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
862 ),
863 argument(
864 name = "array2",
865 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
866 )
867)]
868#[derive(Debug, PartialEq, Eq, Hash)]
869pub struct ArrayHasAny {
870 signature: Signature,
871 aliases: Vec<String>,
872}
873
874impl Default for ArrayHasAny {
875 fn default() -> Self {
876 Self::new()
877 }
878}
879
880impl ArrayHasAny {
881 pub fn new() -> Self {
882 Self {
883 signature: Signature::arrays(2, None, Volatility::Immutable),
884 aliases: vec![String::from("list_has_any"), String::from("arrays_overlap")],
885 }
886 }
887}
888
889impl ScalarUDFImpl for ArrayHasAny {
890 fn name(&self) -> &str {
891 "array_has_any"
892 }
893
894 fn signature(&self) -> &Signature {
895 &self.signature
896 }
897
898 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
899 Ok(DataType::Boolean)
900 }
901
902 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
903 let [first_arg, second_arg] = take_function_args(self.name(), &args.args)?;
904
905 match (&first_arg, &second_arg) {
907 (cv, ColumnarValue::Scalar(scalar)) | (ColumnarValue::Scalar(scalar), cv) => {
908 array_has_any_with_scalar(cv, scalar)
909 }
910 _ => make_scalar_function(array_has_any_inner)(&args.args),
911 }
912 }
913
914 fn aliases(&self) -> &[String] {
915 &self.aliases
916 }
917
918 fn documentation(&self) -> Option<&Documentation> {
919 self.doc()
920 }
921}
922
923#[derive(Debug, PartialEq, Clone, Copy)]
925enum ComparisonType {
926 All,
928 Any,
930}
931
932fn array_has_string_kernel(
933 haystack: &[Option<&str>],
934 needle: &[Option<&str>],
935 comparison_type: ComparisonType,
936) -> bool {
937 match comparison_type {
938 ComparisonType::All => needle
939 .iter()
940 .dedup()
941 .all(|x| haystack.iter().dedup().any(|y| y == x)),
942 ComparisonType::Any => needle
943 .iter()
944 .dedup()
945 .any(|x| haystack.iter().dedup().any(|y| y == x)),
946 }
947}
948
949fn general_array_has_all_and_any_kernel(
950 haystack_rows: &Rows,
951 h_range: Range<usize>,
952 needle_rows: &Rows,
953 mut n_range: Range<usize>,
954 comparison_type: ComparisonType,
955) -> bool {
956 let h_start = h_range.start;
957 let h_end = h_range.end;
958
959 match comparison_type {
960 ComparisonType::All => n_range.all(|ni| {
961 let needle_row = needle_rows.row(ni);
962 (h_start..h_end).any(|hi| haystack_rows.row(hi) == needle_row)
963 }),
964 ComparisonType::Any => n_range.any(|ni| {
965 let needle_row = needle_rows.row(ni);
966 (h_start..h_end).any(|hi| haystack_rows.row(hi) == needle_row)
967 }),
968 }
969}
970
971#[cfg(test)]
972mod tests {
973 use std::sync::Arc;
974
975 use arrow::datatypes::Int32Type;
976 use arrow::{
977 array::{
978 Array, ArrayRef, AsArray, FixedSizeListArray, Int32Array, ListArray,
979 create_array,
980 },
981 buffer::OffsetBuffer,
982 datatypes::{DataType, Field},
983 };
984 use datafusion_common::{
985 DataFusionError, ScalarValue, config::ConfigOptions,
986 utils::SingleRowListArrayBuilder,
987 };
988 use datafusion_expr::simplify::SimplifyContext;
989 use datafusion_expr::{
990 ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDFImpl, col, lit,
991 simplify::ExprSimplifyResult,
992 };
993
994 use crate::expr_fn::make_array;
995
996 use super::{ArrayHas, ArrayHasAll, ArrayHasAny};
997
998 #[test]
999 fn test_simplify_array_has_to_in_list() {
1000 let haystack = lit(SingleRowListArrayBuilder::new(create_array!(
1001 Int32,
1002 [1, 2, 3]
1003 ))
1004 .build_list_scalar());
1005 let needle = col("c");
1006
1007 let context = SimplifyContext::default();
1008
1009 let Ok(ExprSimplifyResult::Simplified(Expr::InList(in_list))) =
1010 ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
1011 else {
1012 panic!("Expected simplified expression");
1013 };
1014
1015 assert_eq!(
1016 in_list,
1017 datafusion_expr::expr::InList {
1018 expr: Box::new(needle),
1019 list: vec![lit(1), lit(2), lit(3)],
1020 negated: false,
1021 }
1022 );
1023 }
1024
1025 #[test]
1026 fn test_simplify_array_has_with_make_array_to_in_list() {
1027 let haystack = make_array(vec![lit(1), lit(2), lit(3)]);
1028 let needle = col("c");
1029
1030 let context = SimplifyContext::default();
1031
1032 let Ok(ExprSimplifyResult::Simplified(Expr::InList(in_list))) =
1033 ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
1034 else {
1035 panic!("Expected simplified expression");
1036 };
1037
1038 assert_eq!(
1039 in_list,
1040 datafusion_expr::expr::InList {
1041 expr: Box::new(needle),
1042 list: vec![lit(1), lit(2), lit(3)],
1043 negated: false,
1044 }
1045 );
1046 }
1047
1048 #[test]
1049 fn test_simplify_array_has_with_null_to_null() {
1050 let haystack = Expr::Literal(ScalarValue::Null, None);
1051 let needle = col("c");
1052
1053 let context = SimplifyContext::default();
1054 let Ok(ExprSimplifyResult::Simplified(simplified)) =
1055 ArrayHas::new().simplify(vec![haystack, needle], &context)
1056 else {
1057 panic!("Expected simplified expression");
1058 };
1059
1060 assert_eq!(simplified, Expr::Literal(ScalarValue::Boolean(None), None));
1061 }
1062
1063 #[test]
1064 fn test_simplify_array_has_with_null_list_to_null() {
1065 let haystack =
1066 ListArray::from_iter_primitive::<Int32Type, [Option<i32>; 0], _>([None]);
1067 let haystack = Expr::Literal(ScalarValue::List(Arc::new(haystack)), None);
1068 let needle = col("c");
1069
1070 let context = SimplifyContext::default();
1071 let Ok(ExprSimplifyResult::Simplified(simplified)) =
1072 ArrayHas::new().simplify(vec![haystack, needle], &context)
1073 else {
1074 panic!("Expected simplified expression");
1075 };
1076
1077 assert_eq!(simplified, Expr::Literal(ScalarValue::Boolean(None), None));
1078 }
1079
1080 #[test]
1081 fn test_array_has_complex_list_not_simplified() {
1082 let haystack = col("c1");
1083 let needle = col("c2");
1084
1085 let context = SimplifyContext::default();
1086
1087 let Ok(ExprSimplifyResult::Original(args)) =
1088 ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
1089 else {
1090 panic!("Expected simplified expression");
1091 };
1092
1093 assert_eq!(args, vec![col("c1"), col("c2")],);
1094 }
1095
1096 #[test]
1097 fn test_array_has_list_empty_child() -> Result<(), DataFusionError> {
1098 let haystack_field = Arc::new(Field::new_list(
1099 "haystack",
1100 Field::new_list("", Field::new("", DataType::Int32, true), true),
1101 true,
1102 ));
1103
1104 let needle_field = Arc::new(Field::new("needle", DataType::Int32, true));
1105 let return_field = Arc::new(Field::new("return", DataType::Boolean, true));
1106 let haystack = ListArray::new(
1107 Field::new_list_field(DataType::Int32, true).into(),
1108 OffsetBuffer::new(vec![0, 0].into()),
1109 Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef,
1110 Some(vec![false].into()),
1111 );
1112
1113 let haystack = ColumnarValue::Array(Arc::new(haystack));
1114 let needle = ColumnarValue::Scalar(ScalarValue::Int32(Some(1)));
1115 let result = ArrayHas::new().invoke_with_args(ScalarFunctionArgs {
1116 args: vec![haystack, needle],
1117 arg_fields: vec![haystack_field, needle_field],
1118 number_rows: 1,
1119 return_field,
1120 config_options: Arc::new(ConfigOptions::default()),
1121 })?;
1122
1123 let output = result.into_array(1)?;
1124 let output = output.as_boolean();
1125 assert_eq!(output.len(), 1);
1126 assert!(output.is_null(0));
1127
1128 Ok(())
1129 }
1130
1131 #[test]
1132 fn test_array_has_sliced_list() -> Result<(), DataFusionError> {
1133 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1135 Some(vec![Some(10), Some(20)]),
1136 Some(vec![Some(30), Some(40)]),
1137 Some(vec![Some(50), Some(60)]),
1138 Some(vec![Some(70), Some(80)]),
1139 ]);
1140 let sliced = list.slice(1, 2);
1141 let haystack_field =
1142 Arc::new(Field::new("haystack", sliced.data_type().clone(), true));
1143 let needle_field = Arc::new(Field::new("needle", DataType::Int32, true));
1144 let return_field = Arc::new(Field::new("return", DataType::Boolean, true));
1145
1146 let invoke = |needle: i32| -> Result<ArrayRef, DataFusionError> {
1149 ArrayHas::new()
1150 .invoke_with_args(ScalarFunctionArgs {
1151 args: vec![
1152 ColumnarValue::Array(Arc::new(sliced.clone())),
1153 ColumnarValue::Scalar(ScalarValue::Int32(Some(needle))),
1154 ],
1155 arg_fields: vec![
1156 Arc::clone(&haystack_field),
1157 Arc::clone(&needle_field),
1158 ],
1159 number_rows: 2,
1160 return_field: Arc::clone(&return_field),
1161 config_options: Arc::new(ConfigOptions::default()),
1162 })?
1163 .into_array(2)
1164 };
1165
1166 let output = invoke(10)?.as_boolean().clone();
1167 assert!(!output.value(0));
1168 assert!(!output.value(1));
1169
1170 let output = invoke(70)?.as_boolean().clone();
1171 assert!(!output.value(0));
1172 assert!(!output.value(1));
1173
1174 Ok(())
1175 }
1176
1177 #[test]
1178 fn test_array_has_list_null_haystack() -> Result<(), DataFusionError> {
1179 let haystack_field = Arc::new(Field::new("haystack", DataType::Null, true));
1180 let needle_field = Arc::new(Field::new("needle", DataType::Int32, true));
1181 let return_field = Arc::new(Field::new("return", DataType::Boolean, true));
1182 let haystack =
1183 ListArray::from_iter_primitive::<Int32Type, [Option<i32>; 0], _>([
1184 None, None, None,
1185 ]);
1186
1187 let haystack = ColumnarValue::Array(Arc::new(haystack));
1188 let needle = ColumnarValue::Scalar(ScalarValue::Int32(Some(1)));
1189 let result = ArrayHas::new().invoke_with_args(ScalarFunctionArgs {
1190 args: vec![haystack, needle],
1191 arg_fields: vec![haystack_field, needle_field],
1192 number_rows: 1,
1193 return_field,
1194 config_options: Arc::new(ConfigOptions::default()),
1195 })?;
1196
1197 let output = result.into_array(1)?;
1198 let output = output.as_boolean();
1199 assert_eq!(output.len(), 3);
1200 for i in 0..3 {
1201 assert!(output.is_null(i));
1202 }
1203
1204 Ok(())
1205 }
1206
1207 fn invoke_and_assert(
1210 udf: &dyn ScalarUDFImpl,
1211 haystack: &ArrayRef,
1212 needle: ArrayRef,
1213 expected: &[Option<bool>],
1214 ) {
1215 let num_rows = haystack.len();
1216 let list_type = haystack.data_type();
1217 let result = udf
1218 .invoke_with_args(ScalarFunctionArgs {
1219 args: vec![
1220 ColumnarValue::Array(Arc::clone(haystack)),
1221 ColumnarValue::Array(needle),
1222 ],
1223 arg_fields: vec![
1224 Arc::new(Field::new("haystack", list_type.clone(), false)),
1225 Arc::new(Field::new("needle", list_type.clone(), false)),
1226 ],
1227 number_rows: num_rows,
1228 return_field: Arc::new(Field::new("return", DataType::Boolean, true)),
1229 config_options: Arc::new(ConfigOptions::default()),
1230 })
1231 .unwrap();
1232 let output = result.into_array(num_rows).unwrap();
1233 assert_eq!(output.as_boolean().iter().collect::<Vec<_>>(), expected);
1234 }
1235
1236 #[test]
1237 fn test_sliced_list_offsets() {
1238 let field: Arc<Field> = Arc::new(Field::new("item", DataType::Int32, false));
1244 let full_values = Arc::new(Int32Array::from(vec![1, 2, 11, 12, 21, 22, 31, 32]));
1245 let full_offsets = OffsetBuffer::new(vec![0, 2, 4, 6, 8].into());
1246 let full = ListArray::new(Arc::clone(&field), full_offsets, full_values, None);
1247 let sliced_haystack: ArrayRef = Arc::new(full.slice(1, 2));
1248
1249 let needle_all: ArrayRef = Arc::new(ListArray::new(
1251 Arc::clone(&field),
1252 OffsetBuffer::new(vec![0, 1, 2].into()),
1253 Arc::new(Int32Array::from(vec![11, 21])),
1254 None,
1255 ));
1256 invoke_and_assert(
1257 &ArrayHasAll::new(),
1258 &sliced_haystack,
1259 needle_all,
1260 &[Some(true), Some(true)],
1261 );
1262
1263 let needle_any: ArrayRef = Arc::new(ListArray::new(
1265 field,
1266 OffsetBuffer::new(vec![0, 2, 4].into()),
1267 Arc::new(Int32Array::from(vec![99, 11, 99, 21])),
1268 None,
1269 ));
1270 invoke_and_assert(
1271 &ArrayHasAny::new(),
1272 &sliced_haystack,
1273 needle_any,
1274 &[Some(true), Some(true)],
1275 );
1276 }
1277
1278 #[test]
1279 fn test_sliced_fixed_size_list_offsets() {
1280 let field = Arc::new(Field::new("item", DataType::Int32, false));
1282 let full_values = Arc::new(Int32Array::from(vec![1, 2, 11, 12, 21, 22, 31, 32]));
1283 let full = FixedSizeListArray::new(Arc::clone(&field), 2, full_values, None);
1284 let sliced_haystack: ArrayRef = Arc::new(full.slice(1, 2));
1285
1286 let needle_all: ArrayRef = Arc::new(FixedSizeListArray::new(
1288 Arc::clone(&field),
1289 2,
1290 Arc::new(Int32Array::from(vec![11, 12, 21, 22])),
1291 None,
1292 ));
1293 invoke_and_assert(
1294 &ArrayHasAll::new(),
1295 &sliced_haystack,
1296 needle_all,
1297 &[Some(true), Some(true)],
1298 );
1299
1300 let needle_any: ArrayRef = Arc::new(FixedSizeListArray::new(
1302 field,
1303 2,
1304 Arc::new(Int32Array::from(vec![99, 12, 99, 22])),
1305 None,
1306 ));
1307 invoke_and_assert(
1308 &ArrayHasAny::new(),
1309 &sliced_haystack,
1310 needle_any,
1311 &[Some(true), Some(true)],
1312 );
1313 }
1314}