1use arrow::array::{Array, ArrayRef, BooleanArray, Datum, Scalar};
21use arrow::buffer::BooleanBuffer;
22use arrow::datatypes::DataType;
23use arrow::row::{RowConverter, Rows, SortField};
24use datafusion_common::cast::{as_fixed_size_list_array, as_generic_list_array};
25use datafusion_common::utils::string_utils::string_array_to_vec;
26use datafusion_common::utils::take_function_args;
27use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
28use datafusion_expr::expr::{InList, ScalarFunction};
29use datafusion_expr::simplify::ExprSimplifyResult;
30use datafusion_expr::{
31 ColumnarValue, Documentation, Expr, ScalarUDFImpl, Signature, Volatility,
32};
33use datafusion_macros::user_doc;
34use datafusion_physical_expr_common::datum::compare_with_eq;
35use itertools::Itertools;
36
37use crate::make_array::make_array_udf;
38use crate::utils::make_scalar_function;
39
40use std::any::Any;
41use std::sync::Arc;
42
43make_udf_expr_and_func!(ArrayHas,
45 array_has,
46 haystack_array element, "returns true, if the element appears in the first array, otherwise false.", array_has_udf );
50make_udf_expr_and_func!(ArrayHasAll,
51 array_has_all,
52 haystack_array needle_array, "returns true if each element of the second array appears in the first array; otherwise, it returns false.", array_has_all_udf );
56make_udf_expr_and_func!(ArrayHasAny,
57 array_has_any,
58 haystack_array needle_array, "returns true if at least one element of the second array appears in the first array; otherwise, it returns false.", array_has_any_udf );
62
63#[user_doc(
64 doc_section(label = "Array Functions"),
65 description = "Returns true if the array contains the element.",
66 syntax_example = "array_has(array, element)",
67 sql_example = r#"```sql
68> select array_has([1, 2, 3], 2);
69+-----------------------------+
70| array_has(List([1,2,3]), 2) |
71+-----------------------------+
72| true |
73+-----------------------------+
74```"#,
75 argument(
76 name = "array",
77 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
78 ),
79 argument(
80 name = "element",
81 description = "Scalar or Array expression. Can be a constant, column, or function, and any combination of array operators."
82 )
83)]
84#[derive(Debug)]
85pub struct ArrayHas {
86 signature: Signature,
87 aliases: Vec<String>,
88}
89
90impl Default for ArrayHas {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96impl ArrayHas {
97 pub fn new() -> Self {
98 Self {
99 signature: Signature::array_and_element(Volatility::Immutable),
100 aliases: vec![
101 String::from("list_has"),
102 String::from("array_contains"),
103 String::from("list_contains"),
104 ],
105 }
106 }
107}
108
109impl ScalarUDFImpl for ArrayHas {
110 fn as_any(&self) -> &dyn Any {
111 self
112 }
113 fn name(&self) -> &str {
114 "array_has"
115 }
116
117 fn signature(&self) -> &Signature {
118 &self.signature
119 }
120
121 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
122 Ok(DataType::Boolean)
123 }
124
125 fn simplify(
126 &self,
127 mut args: Vec<Expr>,
128 _info: &dyn datafusion_expr::simplify::SimplifyInfo,
129 ) -> Result<ExprSimplifyResult> {
130 let [haystack, needle] = take_function_args(self.name(), &mut args)?;
131
132 if let Expr::Literal(ScalarValue::List(array), _) = haystack {
135 assert_eq!(array.len(), 1); if let Ok(scalar_values) =
142 ScalarValue::convert_array_to_scalar_vec(array.as_ref())
143 {
144 assert_eq!(scalar_values.len(), 1);
145 let list = scalar_values
146 .into_iter()
147 .flatten()
148 .map(|v| Expr::Literal(v, None))
149 .collect();
150
151 return Ok(ExprSimplifyResult::Simplified(Expr::InList(InList {
152 expr: Box::new(std::mem::take(needle)),
153 list,
154 negated: false,
155 })));
156 }
157 } else if let Expr::ScalarFunction(ScalarFunction { func, args }) = haystack {
158 if func == &make_array_udf() {
160 return Ok(ExprSimplifyResult::Simplified(Expr::InList(InList {
161 expr: Box::new(std::mem::take(needle)),
162 list: std::mem::take(args),
163 negated: false,
164 })));
165 }
166 }
167
168 Ok(ExprSimplifyResult::Original(args))
169 }
170
171 fn invoke_with_args(
172 &self,
173 args: datafusion_expr::ScalarFunctionArgs,
174 ) -> Result<ColumnarValue> {
175 let [first_arg, second_arg] = take_function_args(self.name(), &args.args)?;
176 match &second_arg {
177 ColumnarValue::Array(array_needle) => {
178 let haystack = first_arg.to_array(array_needle.len())?;
180 let array = array_has_inner_for_array(&haystack, array_needle)?;
181 Ok(ColumnarValue::Array(array))
182 }
183 ColumnarValue::Scalar(scalar_needle) => {
184 if scalar_needle.is_null() {
187 return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
188 }
189
190 let haystack = first_arg.to_array(1)?;
192 let needle = scalar_needle.to_array_of_size(1)?;
193 let needle = Scalar::new(needle);
194 let array = array_has_inner_for_scalar(&haystack, &needle)?;
195 if let ColumnarValue::Scalar(_) = &first_arg {
196 let scalar_value = ScalarValue::try_from_array(&array, 0)?;
198 Ok(ColumnarValue::Scalar(scalar_value))
199 } else {
200 Ok(ColumnarValue::Array(array))
201 }
202 }
203 }
204 }
205
206 fn aliases(&self) -> &[String] {
207 &self.aliases
208 }
209
210 fn documentation(&self) -> Option<&Documentation> {
211 self.doc()
212 }
213}
214
215fn array_has_inner_for_scalar(
216 haystack: &ArrayRef,
217 needle: &dyn Datum,
218) -> Result<ArrayRef> {
219 let haystack = haystack.as_ref().try_into()?;
220 array_has_dispatch_for_scalar(haystack, needle)
221}
222
223fn array_has_inner_for_array(haystack: &ArrayRef, needle: &ArrayRef) -> Result<ArrayRef> {
224 let haystack = haystack.as_ref().try_into()?;
225 array_has_dispatch_for_array(haystack, needle)
226}
227
228enum ArrayWrapper<'a> {
229 FixedSizeList(&'a arrow::array::FixedSizeListArray),
230 List(&'a arrow::array::GenericListArray<i32>),
231 LargeList(&'a arrow::array::GenericListArray<i64>),
232}
233
234impl<'a> TryFrom<&'a dyn Array> for ArrayWrapper<'a> {
235 type Error = DataFusionError;
236
237 fn try_from(
238 value: &'a dyn Array,
239 ) -> std::result::Result<ArrayWrapper<'a>, Self::Error> {
240 match value.data_type() {
241 DataType::List(_) => {
242 Ok(ArrayWrapper::List(as_generic_list_array::<i32>(value)?))
243 }
244 DataType::LargeList(_) => Ok(ArrayWrapper::LargeList(
245 as_generic_list_array::<i64>(value)?,
246 )),
247 DataType::FixedSizeList(_, _) => Ok(ArrayWrapper::FixedSizeList(
248 as_fixed_size_list_array(value)?,
249 )),
250 _ => exec_err!("array_has does not support type '{:?}'.", value.data_type()),
251 }
252 }
253}
254
255impl<'a> ArrayWrapper<'a> {
256 fn len(&self) -> usize {
257 match self {
258 ArrayWrapper::FixedSizeList(arr) => arr.len(),
259 ArrayWrapper::List(arr) => arr.len(),
260 ArrayWrapper::LargeList(arr) => arr.len(),
261 }
262 }
263
264 fn iter(&self) -> Box<dyn Iterator<Item = Option<ArrayRef>> + 'a> {
265 match self {
266 ArrayWrapper::FixedSizeList(arr) => Box::new(arr.iter()),
267 ArrayWrapper::List(arr) => Box::new(arr.iter()),
268 ArrayWrapper::LargeList(arr) => Box::new(arr.iter()),
269 }
270 }
271
272 fn values(&self) -> &ArrayRef {
273 match self {
274 ArrayWrapper::FixedSizeList(arr) => arr.values(),
275 ArrayWrapper::List(arr) => arr.values(),
276 ArrayWrapper::LargeList(arr) => arr.values(),
277 }
278 }
279
280 fn value_type(&self) -> DataType {
281 match self {
282 ArrayWrapper::FixedSizeList(arr) => arr.value_type(),
283 ArrayWrapper::List(arr) => arr.value_type(),
284 ArrayWrapper::LargeList(arr) => arr.value_type(),
285 }
286 }
287
288 fn offsets(&self) -> Box<dyn Iterator<Item = usize> + 'a> {
289 match self {
290 ArrayWrapper::FixedSizeList(arr) => {
291 let offsets = (0..=arr.len())
292 .step_by(arr.value_length() as usize)
293 .collect::<Vec<_>>();
294 Box::new(offsets.into_iter())
295 }
296 ArrayWrapper::List(arr) => {
297 Box::new(arr.offsets().iter().map(|o| (*o) as usize))
298 }
299 ArrayWrapper::LargeList(arr) => {
300 Box::new(arr.offsets().iter().map(|o| (*o) as usize))
301 }
302 }
303 }
304}
305
306fn array_has_dispatch_for_array(
307 haystack: ArrayWrapper<'_>,
308 needle: &ArrayRef,
309) -> Result<ArrayRef> {
310 let mut boolean_builder = BooleanArray::builder(haystack.len());
311 for (i, arr) in haystack.iter().enumerate() {
312 if arr.is_none() || needle.is_null(i) {
313 boolean_builder.append_null();
314 continue;
315 }
316 let arr = arr.unwrap();
317 let is_nested = arr.data_type().is_nested();
318 let needle_row = Scalar::new(needle.slice(i, 1));
319 let eq_array = compare_with_eq(&arr, &needle_row, is_nested)?;
320 boolean_builder.append_value(eq_array.true_count() > 0);
321 }
322
323 Ok(Arc::new(boolean_builder.finish()))
324}
325
326fn array_has_dispatch_for_scalar(
327 haystack: ArrayWrapper<'_>,
328 needle: &dyn Datum,
329) -> Result<ArrayRef> {
330 let values = haystack.values();
331 let is_nested = values.data_type().is_nested();
332 if values.is_empty() {
335 return Ok(Arc::new(BooleanArray::new(
336 BooleanBuffer::new_unset(haystack.len()),
337 None,
338 )));
339 }
340 let eq_array = compare_with_eq(values, needle, is_nested)?;
341 let mut final_contained = vec![None; haystack.len()];
342
343 let validity = match &haystack {
345 ArrayWrapper::FixedSizeList(arr) => arr.nulls(),
346 ArrayWrapper::List(arr) => arr.nulls(),
347 ArrayWrapper::LargeList(arr) => arr.nulls(),
348 };
349
350 for (i, (start, end)) in haystack.offsets().tuple_windows().enumerate() {
351 let length = end - start;
352
353 if let Some(validity_buffer) = validity {
355 if !validity_buffer.is_valid(i) {
356 final_contained[i] = None; continue;
358 }
359 }
360
361 if length == 0 {
363 final_contained[i] = Some(false); } else {
365 let sliced_array = eq_array.slice(start, length);
366 final_contained[i] = Some(sliced_array.true_count() > 0);
367 }
368 }
369
370 Ok(Arc::new(BooleanArray::from(final_contained)))
371}
372
373fn array_has_all_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
374 array_has_all_and_any_inner(args, ComparisonType::All)
375}
376
377fn general_array_has_for_all_and_any<'a>(
379 haystack: &ArrayWrapper<'a>,
380 needle: &ArrayWrapper<'a>,
381 comparison_type: ComparisonType,
382) -> Result<ArrayRef> {
383 let mut boolean_builder = BooleanArray::builder(haystack.len());
384 let converter = RowConverter::new(vec![SortField::new(haystack.value_type())])?;
385
386 for (arr, sub_arr) in haystack.iter().zip(needle.iter()) {
387 if let (Some(arr), Some(sub_arr)) = (arr, sub_arr) {
388 let arr_values = converter.convert_columns(&[arr])?;
389 let sub_arr_values = converter.convert_columns(&[sub_arr])?;
390 boolean_builder.append_value(general_array_has_all_and_any_kernel(
391 arr_values,
392 sub_arr_values,
393 comparison_type,
394 ));
395 } else {
396 boolean_builder.append_null();
397 }
398 }
399
400 Ok(Arc::new(boolean_builder.finish()))
401}
402
403fn array_has_all_and_any_string_internal<'a>(
405 haystack: &ArrayWrapper<'a>,
406 needle: &ArrayWrapper<'a>,
407 comparison_type: ComparisonType,
408) -> Result<ArrayRef> {
409 let mut boolean_builder = BooleanArray::builder(haystack.len());
410 for (arr, sub_arr) in haystack.iter().zip(needle.iter()) {
411 match (arr, sub_arr) {
412 (Some(arr), Some(sub_arr)) => {
413 let haystack_array = string_array_to_vec(&arr);
414 let needle_array = string_array_to_vec(&sub_arr);
415 boolean_builder.append_value(array_has_string_kernel(
416 haystack_array,
417 needle_array,
418 comparison_type,
419 ));
420 }
421 (_, _) => {
422 boolean_builder.append_null();
423 }
424 }
425 }
426
427 Ok(Arc::new(boolean_builder.finish()))
428}
429
430fn array_has_all_and_any_dispatch<'a>(
431 haystack: &ArrayWrapper<'a>,
432 needle: &ArrayWrapper<'a>,
433 comparison_type: ComparisonType,
434) -> Result<ArrayRef> {
435 if needle.values().is_empty() {
436 let buffer = match comparison_type {
437 ComparisonType::All => BooleanBuffer::new_set(haystack.len()),
438 ComparisonType::Any => BooleanBuffer::new_unset(haystack.len()),
439 };
440 Ok(Arc::new(BooleanArray::from(buffer)))
441 } else {
442 match needle.value_type() {
443 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
444 array_has_all_and_any_string_internal(haystack, needle, comparison_type)
445 }
446 _ => general_array_has_for_all_and_any(haystack, needle, comparison_type),
447 }
448 }
449}
450
451fn array_has_all_and_any_inner(
452 args: &[ArrayRef],
453 comparison_type: ComparisonType,
454) -> Result<ArrayRef> {
455 let haystack: ArrayWrapper = args[0].as_ref().try_into()?;
456 let needle: ArrayWrapper = args[1].as_ref().try_into()?;
457 array_has_all_and_any_dispatch(&haystack, &needle, comparison_type)
458}
459
460fn array_has_any_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
461 array_has_all_and_any_inner(args, ComparisonType::Any)
462}
463
464#[user_doc(
465 doc_section(label = "Array Functions"),
466 description = "Returns true if all elements of sub-array exist in array.",
467 syntax_example = "array_has_all(array, sub-array)",
468 sql_example = r#"```sql
469> select array_has_all([1, 2, 3, 4], [2, 3]);
470+--------------------------------------------+
471| array_has_all(List([1,2,3,4]), List([2,3])) |
472+--------------------------------------------+
473| true |
474+--------------------------------------------+
475```"#,
476 argument(
477 name = "array",
478 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
479 ),
480 argument(
481 name = "sub-array",
482 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
483 )
484)]
485#[derive(Debug)]
486pub struct ArrayHasAll {
487 signature: Signature,
488 aliases: Vec<String>,
489}
490
491impl Default for ArrayHasAll {
492 fn default() -> Self {
493 Self::new()
494 }
495}
496
497impl ArrayHasAll {
498 pub fn new() -> Self {
499 Self {
500 signature: Signature::any(2, Volatility::Immutable),
501 aliases: vec![String::from("list_has_all")],
502 }
503 }
504}
505
506impl ScalarUDFImpl for ArrayHasAll {
507 fn as_any(&self) -> &dyn Any {
508 self
509 }
510 fn name(&self) -> &str {
511 "array_has_all"
512 }
513
514 fn signature(&self) -> &Signature {
515 &self.signature
516 }
517
518 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
519 Ok(DataType::Boolean)
520 }
521
522 fn invoke_with_args(
523 &self,
524 args: datafusion_expr::ScalarFunctionArgs,
525 ) -> Result<ColumnarValue> {
526 make_scalar_function(array_has_all_inner)(&args.args)
527 }
528
529 fn aliases(&self) -> &[String] {
530 &self.aliases
531 }
532
533 fn documentation(&self) -> Option<&Documentation> {
534 self.doc()
535 }
536}
537
538#[user_doc(
539 doc_section(label = "Array Functions"),
540 description = "Returns true if any elements exist in both arrays.",
541 syntax_example = "array_has_any(array, sub-array)",
542 sql_example = r#"```sql
543> select array_has_any([1, 2, 3], [3, 4]);
544+------------------------------------------+
545| array_has_any(List([1,2,3]), List([3,4])) |
546+------------------------------------------+
547| true |
548+------------------------------------------+
549```"#,
550 argument(
551 name = "array",
552 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
553 ),
554 argument(
555 name = "sub-array",
556 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
557 )
558)]
559#[derive(Debug)]
560pub struct ArrayHasAny {
561 signature: Signature,
562 aliases: Vec<String>,
563}
564
565impl Default for ArrayHasAny {
566 fn default() -> Self {
567 Self::new()
568 }
569}
570
571impl ArrayHasAny {
572 pub fn new() -> Self {
573 Self {
574 signature: Signature::any(2, Volatility::Immutable),
575 aliases: vec![String::from("list_has_any"), String::from("arrays_overlap")],
576 }
577 }
578}
579
580impl ScalarUDFImpl for ArrayHasAny {
581 fn as_any(&self) -> &dyn Any {
582 self
583 }
584 fn name(&self) -> &str {
585 "array_has_any"
586 }
587
588 fn signature(&self) -> &Signature {
589 &self.signature
590 }
591
592 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
593 Ok(DataType::Boolean)
594 }
595
596 fn invoke_with_args(
597 &self,
598 args: datafusion_expr::ScalarFunctionArgs,
599 ) -> Result<ColumnarValue> {
600 make_scalar_function(array_has_any_inner)(&args.args)
601 }
602
603 fn aliases(&self) -> &[String] {
604 &self.aliases
605 }
606
607 fn documentation(&self) -> Option<&Documentation> {
608 self.doc()
609 }
610}
611
612#[derive(Debug, PartialEq, Clone, Copy)]
614enum ComparisonType {
615 All,
617 Any,
619}
620
621fn array_has_string_kernel(
622 haystack: Vec<Option<&str>>,
623 needle: Vec<Option<&str>>,
624 comparison_type: ComparisonType,
625) -> bool {
626 match comparison_type {
627 ComparisonType::All => needle
628 .iter()
629 .dedup()
630 .all(|x| haystack.iter().dedup().any(|y| y == x)),
631 ComparisonType::Any => needle
632 .iter()
633 .dedup()
634 .any(|x| haystack.iter().dedup().any(|y| y == x)),
635 }
636}
637
638fn general_array_has_all_and_any_kernel(
639 haystack_rows: Rows,
640 needle_rows: Rows,
641 comparison_type: ComparisonType,
642) -> bool {
643 match comparison_type {
644 ComparisonType::All => needle_rows.iter().all(|needle_row| {
645 haystack_rows
646 .iter()
647 .any(|haystack_row| haystack_row == needle_row)
648 }),
649 ComparisonType::Any => needle_rows.iter().any(|needle_row| {
650 haystack_rows
651 .iter()
652 .any(|haystack_row| haystack_row == needle_row)
653 }),
654 }
655}
656
657#[cfg(test)]
658mod tests {
659 use arrow::array::create_array;
660 use datafusion_common::utils::SingleRowListArrayBuilder;
661 use datafusion_expr::{
662 col, execution_props::ExecutionProps, lit, simplify::ExprSimplifyResult, Expr,
663 ScalarUDFImpl,
664 };
665
666 use crate::expr_fn::make_array;
667
668 use super::ArrayHas;
669
670 #[test]
671 fn test_simplify_array_has_to_in_list() {
672 let haystack = lit(SingleRowListArrayBuilder::new(create_array!(
673 Int32,
674 [1, 2, 3]
675 ))
676 .build_list_scalar());
677 let needle = col("c");
678
679 let props = ExecutionProps::new();
680 let context = datafusion_expr::simplify::SimplifyContext::new(&props);
681
682 let Ok(ExprSimplifyResult::Simplified(Expr::InList(in_list))) =
683 ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
684 else {
685 panic!("Expected simplified expression");
686 };
687
688 assert_eq!(
689 in_list,
690 datafusion_expr::expr::InList {
691 expr: Box::new(needle),
692 list: vec![lit(1), lit(2), lit(3)],
693 negated: false,
694 }
695 );
696 }
697
698 #[test]
699 fn test_simplify_array_has_with_make_array_to_in_list() {
700 let haystack = make_array(vec![lit(1), lit(2), lit(3)]);
701 let needle = col("c");
702
703 let props = ExecutionProps::new();
704 let context = datafusion_expr::simplify::SimplifyContext::new(&props);
705
706 let Ok(ExprSimplifyResult::Simplified(Expr::InList(in_list))) =
707 ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
708 else {
709 panic!("Expected simplified expression");
710 };
711
712 assert_eq!(
713 in_list,
714 datafusion_expr::expr::InList {
715 expr: Box::new(needle),
716 list: vec![lit(1), lit(2), lit(3)],
717 negated: false,
718 }
719 );
720 }
721
722 #[test]
723 fn test_array_has_complex_list_not_simplified() {
724 let haystack = col("c1");
725 let needle = col("c2");
726
727 let props = ExecutionProps::new();
728 let context = datafusion_expr::simplify::SimplifyContext::new(&props);
729
730 let Ok(ExprSimplifyResult::Original(args)) =
731 ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
732 else {
733 panic!("Expected simplified expression");
734 };
735
736 assert_eq!(args, vec![col("c1"), col("c2")],);
737 }
738}