Skip to main content

datafusion_functions_nested/
array_has.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_has, array_has_all and array_has_any functions.
19
20use arrow::array::{
21    Array, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder, Datum, Scalar,
22    StringArrayType,
23};
24use arrow::buffer::BooleanBuffer;
25use arrow::datatypes::DataType;
26use arrow::row::{RowConverter, Rows, SortField};
27use datafusion_common::cast::{as_fixed_size_list_array, as_generic_list_array};
28use datafusion_common::utils::string_utils::string_array_to_vec;
29use datafusion_common::utils::take_function_args;
30use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err};
31use datafusion_expr::expr::ScalarFunction;
32use datafusion_expr::simplify::ExprSimplifyResult;
33use datafusion_expr::{
34    ColumnarValue, Documentation, Expr, ScalarUDFImpl, Signature, Volatility, in_list,
35};
36use datafusion_macros::user_doc;
37use datafusion_physical_expr_common::datum::compare_with_eq;
38use itertools::Itertools;
39
40use crate::make_array::make_array_udf;
41use crate::utils::make_scalar_function;
42
43use hashbrown::HashSet;
44use std::any::Any;
45use std::sync::Arc;
46
47// Create static instances of ScalarUDFs for each function
48make_udf_expr_and_func!(ArrayHas,
49    array_has,
50    haystack_array element, // arg names
51    "returns true, if the element appears in the first array, otherwise false.", // doc
52    array_has_udf // internal function name
53);
54make_udf_expr_and_func!(ArrayHasAll,
55    array_has_all,
56    haystack_array needle_array, // arg names
57    "returns true if each element of the second array appears in the first array; otherwise, it returns false.", // doc
58    array_has_all_udf // internal function name
59);
60make_udf_expr_and_func!(ArrayHasAny,
61    array_has_any,
62    first_array second_array, // arg names
63    "returns true if at least one element of the second array appears in the first array; otherwise, it returns false.", // doc
64    array_has_any_udf // internal function name
65);
66
67#[user_doc(
68    doc_section(label = "Array Functions"),
69    description = "Returns true if the array contains the element.",
70    syntax_example = "array_has(array, element)",
71    sql_example = r#"```sql
72> select array_has([1, 2, 3], 2);
73+-----------------------------+
74| array_has(List([1,2,3]), 2) |
75+-----------------------------+
76| true                        |
77+-----------------------------+
78```"#,
79    argument(
80        name = "array",
81        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
82    ),
83    argument(
84        name = "element",
85        description = "Scalar or Array expression. Can be a constant, column, or function, and any combination of array operators."
86    )
87)]
88#[derive(Debug, PartialEq, Eq, Hash)]
89pub struct ArrayHas {
90    signature: Signature,
91    aliases: Vec<String>,
92}
93
94impl Default for ArrayHas {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100impl ArrayHas {
101    pub fn new() -> Self {
102        Self {
103            signature: Signature::array_and_element(Volatility::Immutable),
104            aliases: vec![
105                String::from("list_has"),
106                String::from("array_contains"),
107                String::from("list_contains"),
108            ],
109        }
110    }
111}
112
113impl ScalarUDFImpl for ArrayHas {
114    fn as_any(&self) -> &dyn Any {
115        self
116    }
117    fn name(&self) -> &str {
118        "array_has"
119    }
120
121    fn signature(&self) -> &Signature {
122        &self.signature
123    }
124
125    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
126        Ok(DataType::Boolean)
127    }
128
129    fn simplify(
130        &self,
131        mut args: Vec<Expr>,
132        _info: &datafusion_expr::simplify::SimplifyContext,
133    ) -> Result<ExprSimplifyResult> {
134        let [haystack, needle] = take_function_args(self.name(), &mut args)?;
135
136        // if the haystack is a constant list, we can use an inlist expression which is more
137        // efficient because the haystack is not varying per-row
138        match haystack {
139            Expr::Literal(scalar, _) if scalar.is_null() => {
140                return Ok(ExprSimplifyResult::Simplified(Expr::Literal(
141                    ScalarValue::Boolean(None),
142                    None,
143                )));
144            }
145            Expr::Literal(
146                // FixedSizeList gets coerced to List
147                scalar @ ScalarValue::List(_) | scalar @ ScalarValue::LargeList(_),
148                _,
149            ) => {
150                if let Ok(scalar_values) =
151                    ScalarValue::convert_array_to_scalar_vec(&scalar.to_array()?)
152                {
153                    assert_eq!(scalar_values.len(), 1);
154                    let list = scalar_values
155                        .into_iter()
156                        .flatten()
157                        .flatten()
158                        .map(|v| Expr::Literal(v, None))
159                        .collect();
160
161                    return Ok(ExprSimplifyResult::Simplified(in_list(
162                        std::mem::take(needle),
163                        list,
164                        false,
165                    )));
166                }
167            }
168            Expr::ScalarFunction(ScalarFunction { func, args })
169                if func == &make_array_udf() =>
170            {
171                // make_array has a static set of arguments, so we can pull the arguments out from it
172                return Ok(ExprSimplifyResult::Simplified(in_list(
173                    std::mem::take(needle),
174                    std::mem::take(args),
175                    false,
176                )));
177            }
178            _ => {}
179        };
180        Ok(ExprSimplifyResult::Original(args))
181    }
182
183    fn invoke_with_args(
184        &self,
185        args: datafusion_expr::ScalarFunctionArgs,
186    ) -> Result<ColumnarValue> {
187        let [first_arg, second_arg] = take_function_args(self.name(), &args.args)?;
188        if first_arg.data_type().is_null() {
189            // Always return null if the first argument is null
190            // i.e. array_has(null, element) -> null
191            return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
192        }
193
194        match &second_arg {
195            ColumnarValue::Array(array_needle) => {
196                // the needle is already an array, convert the haystack to an array of the same length
197                let haystack = first_arg.to_array(array_needle.len())?;
198                let array = array_has_inner_for_array(&haystack, array_needle)?;
199                Ok(ColumnarValue::Array(array))
200            }
201            ColumnarValue::Scalar(scalar_needle) => {
202                // Always return null if the second argument is null
203                // i.e. array_has(array, null) -> null
204                if scalar_needle.is_null() {
205                    return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
206                }
207
208                // since the needle is a scalar, convert it to an array of size 1
209                let haystack = first_arg.to_array(1)?;
210                let needle = scalar_needle.to_array_of_size(1)?;
211                let needle = Scalar::new(needle);
212                let array = array_has_inner_for_scalar(&haystack, &needle)?;
213                if let ColumnarValue::Scalar(_) = &first_arg {
214                    // If both inputs are scalar, keeps output as scalar
215                    let scalar_value = ScalarValue::try_from_array(&array, 0)?;
216                    Ok(ColumnarValue::Scalar(scalar_value))
217                } else {
218                    Ok(ColumnarValue::Array(array))
219                }
220            }
221        }
222    }
223
224    fn aliases(&self) -> &[String] {
225        &self.aliases
226    }
227
228    fn documentation(&self) -> Option<&Documentation> {
229        self.doc()
230    }
231}
232
233fn array_has_inner_for_scalar(
234    haystack: &ArrayRef,
235    needle: &dyn Datum,
236) -> Result<ArrayRef> {
237    let haystack = haystack.as_ref().try_into()?;
238    array_has_dispatch_for_scalar(haystack, needle)
239}
240
241fn array_has_inner_for_array(haystack: &ArrayRef, needle: &ArrayRef) -> Result<ArrayRef> {
242    let haystack = haystack.as_ref().try_into()?;
243    array_has_dispatch_for_array(haystack, needle)
244}
245
246#[derive(Copy, Clone)]
247enum ArrayWrapper<'a> {
248    FixedSizeList(&'a arrow::array::FixedSizeListArray),
249    List(&'a arrow::array::GenericListArray<i32>),
250    LargeList(&'a arrow::array::GenericListArray<i64>),
251}
252
253impl<'a> TryFrom<&'a dyn Array> for ArrayWrapper<'a> {
254    type Error = DataFusionError;
255
256    fn try_from(
257        value: &'a dyn Array,
258    ) -> std::result::Result<ArrayWrapper<'a>, Self::Error> {
259        match value.data_type() {
260            DataType::List(_) => {
261                Ok(ArrayWrapper::List(as_generic_list_array::<i32>(value)?))
262            }
263            DataType::LargeList(_) => Ok(ArrayWrapper::LargeList(
264                as_generic_list_array::<i64>(value)?,
265            )),
266            DataType::FixedSizeList(_, _) => Ok(ArrayWrapper::FixedSizeList(
267                as_fixed_size_list_array(value)?,
268            )),
269            _ => exec_err!("array_has does not support type '{}'.", value.data_type()),
270        }
271    }
272}
273
274impl<'a> ArrayWrapper<'a> {
275    fn len(&self) -> usize {
276        match self {
277            ArrayWrapper::FixedSizeList(arr) => arr.len(),
278            ArrayWrapper::List(arr) => arr.len(),
279            ArrayWrapper::LargeList(arr) => arr.len(),
280        }
281    }
282
283    fn iter(&self) -> Box<dyn Iterator<Item = Option<ArrayRef>> + 'a> {
284        match self {
285            ArrayWrapper::FixedSizeList(arr) => Box::new(arr.iter()),
286            ArrayWrapper::List(arr) => Box::new(arr.iter()),
287            ArrayWrapper::LargeList(arr) => Box::new(arr.iter()),
288        }
289    }
290
291    fn values(&self) -> &ArrayRef {
292        match self {
293            ArrayWrapper::FixedSizeList(arr) => arr.values(),
294            ArrayWrapper::List(arr) => arr.values(),
295            ArrayWrapper::LargeList(arr) => arr.values(),
296        }
297    }
298
299    fn value_type(&self) -> DataType {
300        match self {
301            ArrayWrapper::FixedSizeList(arr) => arr.value_type(),
302            ArrayWrapper::List(arr) => arr.value_type(),
303            ArrayWrapper::LargeList(arr) => arr.value_type(),
304        }
305    }
306
307    fn offsets(&self) -> Box<dyn Iterator<Item = usize> + 'a> {
308        match self {
309            ArrayWrapper::FixedSizeList(arr) => {
310                let value_length = arr.value_length() as usize;
311                Box::new((0..=arr.len()).map(move |i| i * value_length))
312            }
313            ArrayWrapper::List(arr) => {
314                Box::new(arr.offsets().iter().map(|o| (*o) as usize))
315            }
316            ArrayWrapper::LargeList(arr) => {
317                Box::new(arr.offsets().iter().map(|o| (*o) as usize))
318            }
319        }
320    }
321
322    fn nulls(&self) -> Option<&arrow::buffer::NullBuffer> {
323        match self {
324            ArrayWrapper::FixedSizeList(arr) => arr.nulls(),
325            ArrayWrapper::List(arr) => arr.nulls(),
326            ArrayWrapper::LargeList(arr) => arr.nulls(),
327        }
328    }
329}
330
331fn array_has_dispatch_for_array<'a>(
332    haystack: ArrayWrapper<'a>,
333    needle: &ArrayRef,
334) -> Result<ArrayRef> {
335    let mut boolean_builder = BooleanArray::builder(haystack.len());
336    for (i, arr) in haystack.iter().enumerate() {
337        if arr.is_none() || needle.is_null(i) {
338            boolean_builder.append_null();
339            continue;
340        }
341        let arr = arr.unwrap();
342        let is_nested = arr.data_type().is_nested();
343        let needle_row = Scalar::new(needle.slice(i, 1));
344        let eq_array = compare_with_eq(&arr, &needle_row, is_nested)?;
345        boolean_builder.append_value(eq_array.true_count() > 0);
346    }
347
348    Ok(Arc::new(boolean_builder.finish()))
349}
350
351fn array_has_dispatch_for_scalar(
352    haystack: ArrayWrapper<'_>,
353    needle: &dyn Datum,
354) -> Result<ArrayRef> {
355    // If first argument is empty list (second argument is non-null), return false
356    // i.e. array_has([], non-null element) -> false
357    if haystack.len() == 0 {
358        return Ok(Arc::new(BooleanArray::new(
359            BooleanBuffer::new_unset(haystack.len()),
360            None,
361        )));
362    }
363
364    // For sliced ListArrays, values() returns the full underlying array but
365    // only elements between the first and last offset are visible.
366    let offsets: Vec<usize> = haystack.offsets().collect();
367    let first_offset = offsets[0];
368    let visible_values = haystack
369        .values()
370        .slice(first_offset, offsets[offsets.len() - 1] - first_offset);
371
372    let is_nested = visible_values.data_type().is_nested();
373    let eq_array = compare_with_eq(&visible_values, needle, is_nested)?;
374
375    // When a haystack element is null, `eq()` returns null (not false).
376    // In Arrow, a null BooleanArray entry has validity=0 but an
377    // undefined value bit that may happen to be 1. Since set_indices()
378    // operates on the raw value buffer and ignores validity, we AND the
379    // values with the validity bitmap to clear any undefined bits at
380    // null positions. This ensures set_indices() only yields positions
381    // where the comparison genuinely returned true.
382    let eq_bits = match eq_array.nulls() {
383        Some(nulls) => eq_array.values() & nulls.inner(),
384        None => eq_array.values().clone(),
385    };
386
387    let validity = match &haystack {
388        ArrayWrapper::FixedSizeList(arr) => arr.nulls(),
389        ArrayWrapper::List(arr) => arr.nulls(),
390        ArrayWrapper::LargeList(arr) => arr.nulls(),
391    };
392    let mut matches = eq_bits.set_indices().peekable();
393    let mut result = BooleanBufferBuilder::new(haystack.len());
394    result.append_n(haystack.len(), false);
395
396    // Match positions are relative to visible_values (0-based), so
397    // subtract first_offset from each offset when comparing.
398    for (i, window) in offsets.windows(2).enumerate() {
399        let end = window[1] - first_offset;
400
401        let has_match = matches.peek().is_some_and(|&p| p < end);
402
403        // Advance past all match positions in this row's range.
404        while matches.peek().is_some_and(|&p| p < end) {
405            matches.next();
406        }
407
408        if has_match && validity.is_none_or(|v| v.is_valid(i)) {
409            result.set_bit(i, true);
410        }
411    }
412
413    // A null haystack row always produces a null output, so we can
414    // reuse the haystack's null buffer directly.
415    Ok(Arc::new(BooleanArray::new(
416        result.finish(),
417        validity.cloned(),
418    )))
419}
420
421fn array_has_all_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
422    array_has_all_and_any_inner(args, ComparisonType::All)
423}
424
425// General row comparison for array_has_all and array_has_any
426fn general_array_has_for_all_and_any<'a>(
427    haystack: ArrayWrapper<'a>,
428    needle: ArrayWrapper<'a>,
429    comparison_type: ComparisonType,
430) -> Result<ArrayRef> {
431    let mut boolean_builder = BooleanArray::builder(haystack.len());
432    let converter = RowConverter::new(vec![SortField::new(haystack.value_type())])?;
433
434    for (arr, sub_arr) in haystack.iter().zip(needle.iter()) {
435        if let (Some(arr), Some(sub_arr)) = (arr, sub_arr) {
436            let arr_values = converter.convert_columns(&[arr])?;
437            let sub_arr_values = converter.convert_columns(&[sub_arr])?;
438            boolean_builder.append_value(general_array_has_all_and_any_kernel(
439                &arr_values,
440                &sub_arr_values,
441                comparison_type,
442            ));
443        } else {
444            boolean_builder.append_null();
445        }
446    }
447
448    Ok(Arc::new(boolean_builder.finish()))
449}
450
451// String comparison for array_has_all and array_has_any
452fn array_has_all_and_any_string_internal<'a>(
453    haystack: ArrayWrapper<'a>,
454    needle: ArrayWrapper<'a>,
455    comparison_type: ComparisonType,
456) -> Result<ArrayRef> {
457    let mut boolean_builder = BooleanArray::builder(haystack.len());
458    for (arr, sub_arr) in haystack.iter().zip(needle.iter()) {
459        match (arr, sub_arr) {
460            (Some(arr), Some(sub_arr)) => {
461                let haystack_array = string_array_to_vec(&arr);
462                let needle_array = string_array_to_vec(&sub_arr);
463                boolean_builder.append_value(array_has_string_kernel(
464                    &haystack_array,
465                    &needle_array,
466                    comparison_type,
467                ));
468            }
469            (_, _) => {
470                boolean_builder.append_null();
471            }
472        }
473    }
474
475    Ok(Arc::new(boolean_builder.finish()))
476}
477
478fn array_has_all_and_any_dispatch<'a>(
479    haystack: ArrayWrapper<'a>,
480    needle: ArrayWrapper<'a>,
481    comparison_type: ComparisonType,
482) -> Result<ArrayRef> {
483    if needle.values().is_empty() {
484        let buffer = match comparison_type {
485            ComparisonType::All => BooleanBuffer::new_set(haystack.len()),
486            ComparisonType::Any => BooleanBuffer::new_unset(haystack.len()),
487        };
488        Ok(Arc::new(BooleanArray::from(buffer)))
489    } else {
490        match needle.value_type() {
491            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
492                array_has_all_and_any_string_internal(haystack, needle, comparison_type)
493            }
494            _ => general_array_has_for_all_and_any(haystack, needle, comparison_type),
495        }
496    }
497}
498
499fn array_has_all_and_any_inner(
500    args: &[ArrayRef],
501    comparison_type: ComparisonType,
502) -> Result<ArrayRef> {
503    let haystack: ArrayWrapper = args[0].as_ref().try_into()?;
504    let needle: ArrayWrapper = args[1].as_ref().try_into()?;
505    array_has_all_and_any_dispatch(haystack, needle, comparison_type)
506}
507
508fn array_has_any_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
509    array_has_all_and_any_inner(args, ComparisonType::Any)
510}
511
512/// Fast path for `array_has_any` when exactly one argument is a scalar.
513fn array_has_any_with_scalar(
514    columnar_arg: &ColumnarValue,
515    scalar_arg: &ScalarValue,
516) -> Result<ColumnarValue> {
517    if scalar_arg.is_null() {
518        return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
519    }
520
521    // Convert the scalar to a 1-element ListArray, then extract the inner values
522    let scalar_array = scalar_arg.to_array_of_size(1)?;
523    let scalar_list: ArrayWrapper = scalar_array.as_ref().try_into()?;
524    let offsets: Vec<usize> = scalar_list.offsets().collect();
525    let scalar_values = scalar_list
526        .values()
527        .slice(offsets[0], offsets[1] - offsets[0]);
528
529    // If scalar list is empty, result is always false
530    if scalar_values.is_empty() {
531        return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))));
532    }
533
534    match scalar_values.data_type() {
535        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
536            array_has_any_with_scalar_string(columnar_arg, &scalar_values)
537        }
538        _ => array_has_any_with_scalar_general(columnar_arg, &scalar_values),
539    }
540}
541
542/// When the scalar argument has more elements than this, the scalar fast path
543/// builds a HashSet for O(1) lookups. At or below this threshold, it falls
544/// back to a linear scan, since hashing every columnar element is more
545/// expensive than a linear scan over a short array.
546const SCALAR_SMALL_THRESHOLD: usize = 8;
547
548/// String-specialized scalar fast path for `array_has_any`.
549fn array_has_any_with_scalar_string(
550    columnar_arg: &ColumnarValue,
551    scalar_values: &ArrayRef,
552) -> Result<ColumnarValue> {
553    let (col_arr, is_scalar_output) = match columnar_arg {
554        ColumnarValue::Array(arr) => (Arc::clone(arr), false),
555        ColumnarValue::Scalar(s) => (s.to_array_of_size(1)?, true),
556    };
557
558    let col_list: ArrayWrapper = col_arr.as_ref().try_into()?;
559    let col_values = col_list.values();
560    let col_offsets: Vec<usize> = col_list.offsets().collect();
561    let col_nulls = col_list.nulls();
562
563    let scalar_lookup = ScalarStringLookup::new(scalar_values);
564    let has_null_scalar = scalar_values.null_count() > 0;
565
566    let result = match col_values.data_type() {
567        DataType::Utf8 => array_has_any_string_inner(
568            col_values.as_string::<i32>(),
569            &col_offsets,
570            col_nulls,
571            has_null_scalar,
572            &scalar_lookup,
573        ),
574        DataType::LargeUtf8 => array_has_any_string_inner(
575            col_values.as_string::<i64>(),
576            &col_offsets,
577            col_nulls,
578            has_null_scalar,
579            &scalar_lookup,
580        ),
581        DataType::Utf8View => array_has_any_string_inner(
582            col_values.as_string_view(),
583            &col_offsets,
584            col_nulls,
585            has_null_scalar,
586            &scalar_lookup,
587        ),
588        _ => unreachable!("array_has_any_with_scalar_string called with non-string type"),
589    };
590
591    if is_scalar_output {
592        Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
593            &result, 0,
594        )?))
595    } else {
596        Ok(ColumnarValue::Array(result))
597    }
598}
599
600/// Pre-computed lookup structure for the scalar string fastpath.
601enum ScalarStringLookup<'a> {
602    /// Large scalar: HashSet for O(1) lookups.
603    Set(HashSet<&'a str>),
604    /// Small scalar: Vec for linear scan.
605    List(Vec<Option<&'a str>>),
606}
607
608impl<'a> ScalarStringLookup<'a> {
609    fn new(scalar_values: &'a ArrayRef) -> Self {
610        let strings = string_array_to_vec(scalar_values.as_ref());
611        if strings.len() > SCALAR_SMALL_THRESHOLD {
612            ScalarStringLookup::Set(strings.into_iter().flatten().collect())
613        } else {
614            ScalarStringLookup::List(strings)
615        }
616    }
617
618    fn contains(&self, value: &str) -> bool {
619        match self {
620            ScalarStringLookup::Set(set) => set.contains(value),
621            ScalarStringLookup::List(list) => list.contains(&Some(value)),
622        }
623    }
624}
625
626/// Inner implementation of the string scalar fast path, generic over string
627/// array type to allow direct element access by index.
628fn array_has_any_string_inner<'a, C: StringArrayType<'a> + Copy>(
629    col_strings: C,
630    col_offsets: &[usize],
631    col_nulls: Option<&arrow::buffer::NullBuffer>,
632    has_null_scalar: bool,
633    scalar_lookup: &ScalarStringLookup<'_>,
634) -> ArrayRef {
635    let num_rows = col_offsets.len() - 1;
636    let mut builder = BooleanArray::builder(num_rows);
637
638    for i in 0..num_rows {
639        if col_nulls.is_some_and(|v| v.is_null(i)) {
640            builder.append_null();
641            continue;
642        }
643        let start = col_offsets[i];
644        let end = col_offsets[i + 1];
645        let found = (start..end).any(|j| {
646            if col_strings.is_null(j) {
647                has_null_scalar
648            } else {
649                scalar_lookup.contains(col_strings.value(j))
650            }
651        });
652        builder.append_value(found);
653    }
654
655    Arc::new(builder.finish())
656}
657
658/// General scalar fast path for `array_has_any`, using RowConverter for
659/// type-erased comparison.
660fn array_has_any_with_scalar_general(
661    columnar_arg: &ColumnarValue,
662    scalar_values: &ArrayRef,
663) -> Result<ColumnarValue> {
664    let converter =
665        RowConverter::new(vec![SortField::new(scalar_values.data_type().clone())])?;
666    let scalar_rows = converter.convert_columns(&[Arc::clone(scalar_values)])?;
667
668    let (col_arr, is_scalar_output) = match columnar_arg {
669        ColumnarValue::Array(arr) => (Arc::clone(arr), false),
670        ColumnarValue::Scalar(s) => (s.to_array_of_size(1)?, true),
671    };
672
673    let col_list: ArrayWrapper = col_arr.as_ref().try_into()?;
674    let col_rows = converter.convert_columns(&[Arc::clone(col_list.values())])?;
675    let col_offsets: Vec<usize> = col_list.offsets().collect();
676    let col_nulls = col_list.nulls();
677
678    let mut builder = BooleanArray::builder(col_list.len());
679    let num_scalar = scalar_rows.num_rows();
680
681    if num_scalar > SCALAR_SMALL_THRESHOLD {
682        // Large scalar: build HashSet for O(1) lookups
683        let scalar_set: HashSet<Box<[u8]>> = (0..num_scalar)
684            .map(|i| Box::from(scalar_rows.row(i).as_ref()))
685            .collect();
686
687        for i in 0..col_list.len() {
688            if col_nulls.is_some_and(|v| v.is_null(i)) {
689                builder.append_null();
690                continue;
691            }
692            let start = col_offsets[i];
693            let end = col_offsets[i + 1];
694            let found =
695                (start..end).any(|j| scalar_set.contains(col_rows.row(j).as_ref()));
696            builder.append_value(found);
697        }
698    } else {
699        // Small scalar: linear scan avoids HashSet hashing overhead
700        for i in 0..col_list.len() {
701            if col_nulls.is_some_and(|v| v.is_null(i)) {
702                builder.append_null();
703                continue;
704            }
705            let start = col_offsets[i];
706            let end = col_offsets[i + 1];
707            let found = (start..end)
708                .any(|j| (0..num_scalar).any(|k| col_rows.row(j) == scalar_rows.row(k)));
709            builder.append_value(found);
710        }
711    }
712
713    let result: ArrayRef = Arc::new(builder.finish());
714
715    if is_scalar_output {
716        Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
717            &result, 0,
718        )?))
719    } else {
720        Ok(ColumnarValue::Array(result))
721    }
722}
723
724#[user_doc(
725    doc_section(label = "Array Functions"),
726    description = "Returns true if all elements of sub-array exist in array.",
727    syntax_example = "array_has_all(array, sub-array)",
728    sql_example = r#"```sql
729> select array_has_all([1, 2, 3, 4], [2, 3]);
730+--------------------------------------------+
731| array_has_all(List([1,2,3,4]), List([2,3])) |
732+--------------------------------------------+
733| true                                       |
734+--------------------------------------------+
735```"#,
736    argument(
737        name = "array",
738        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
739    ),
740    argument(
741        name = "sub-array",
742        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
743    )
744)]
745#[derive(Debug, PartialEq, Eq, Hash)]
746pub struct ArrayHasAll {
747    signature: Signature,
748    aliases: Vec<String>,
749}
750
751impl Default for ArrayHasAll {
752    fn default() -> Self {
753        Self::new()
754    }
755}
756
757impl ArrayHasAll {
758    pub fn new() -> Self {
759        Self {
760            signature: Signature::arrays(2, None, Volatility::Immutable),
761            aliases: vec![String::from("list_has_all")],
762        }
763    }
764}
765
766impl ScalarUDFImpl for ArrayHasAll {
767    fn as_any(&self) -> &dyn Any {
768        self
769    }
770    fn name(&self) -> &str {
771        "array_has_all"
772    }
773
774    fn signature(&self) -> &Signature {
775        &self.signature
776    }
777
778    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
779        Ok(DataType::Boolean)
780    }
781
782    fn invoke_with_args(
783        &self,
784        args: datafusion_expr::ScalarFunctionArgs,
785    ) -> Result<ColumnarValue> {
786        make_scalar_function(array_has_all_inner)(&args.args)
787    }
788
789    fn aliases(&self) -> &[String] {
790        &self.aliases
791    }
792
793    fn documentation(&self) -> Option<&Documentation> {
794        self.doc()
795    }
796}
797
798#[user_doc(
799    doc_section(label = "Array Functions"),
800    description = "Returns true if the arrays have any elements in common.",
801    syntax_example = "array_has_any(array1, array2)",
802    sql_example = r#"```sql
803> select array_has_any([1, 2, 3], [3, 4]);
804+------------------------------------------+
805| array_has_any(List([1,2,3]), List([3,4])) |
806+------------------------------------------+
807| true                                     |
808+------------------------------------------+
809```"#,
810    argument(
811        name = "array1",
812        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
813    ),
814    argument(
815        name = "array2",
816        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
817    )
818)]
819#[derive(Debug, PartialEq, Eq, Hash)]
820pub struct ArrayHasAny {
821    signature: Signature,
822    aliases: Vec<String>,
823}
824
825impl Default for ArrayHasAny {
826    fn default() -> Self {
827        Self::new()
828    }
829}
830
831impl ArrayHasAny {
832    pub fn new() -> Self {
833        Self {
834            signature: Signature::arrays(2, None, Volatility::Immutable),
835            aliases: vec![String::from("list_has_any"), String::from("arrays_overlap")],
836        }
837    }
838}
839
840impl ScalarUDFImpl for ArrayHasAny {
841    fn as_any(&self) -> &dyn Any {
842        self
843    }
844    fn name(&self) -> &str {
845        "array_has_any"
846    }
847
848    fn signature(&self) -> &Signature {
849        &self.signature
850    }
851
852    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
853        Ok(DataType::Boolean)
854    }
855
856    fn invoke_with_args(
857        &self,
858        args: datafusion_expr::ScalarFunctionArgs,
859    ) -> Result<ColumnarValue> {
860        let [first_arg, second_arg] = take_function_args(self.name(), &args.args)?;
861
862        // If either argument is scalar, use the fast path.
863        match (&first_arg, &second_arg) {
864            (cv, ColumnarValue::Scalar(scalar)) | (ColumnarValue::Scalar(scalar), cv) => {
865                array_has_any_with_scalar(cv, scalar)
866            }
867            _ => make_scalar_function(array_has_any_inner)(&args.args),
868        }
869    }
870
871    fn aliases(&self) -> &[String] {
872        &self.aliases
873    }
874
875    fn documentation(&self) -> Option<&Documentation> {
876        self.doc()
877    }
878}
879
880/// Represents the type of comparison for array_has.
881#[derive(Debug, PartialEq, Clone, Copy)]
882enum ComparisonType {
883    // array_has_all
884    All,
885    // array_has_any
886    Any,
887}
888
889fn array_has_string_kernel(
890    haystack: &[Option<&str>],
891    needle: &[Option<&str>],
892    comparison_type: ComparisonType,
893) -> bool {
894    match comparison_type {
895        ComparisonType::All => needle
896            .iter()
897            .dedup()
898            .all(|x| haystack.iter().dedup().any(|y| y == x)),
899        ComparisonType::Any => needle
900            .iter()
901            .dedup()
902            .any(|x| haystack.iter().dedup().any(|y| y == x)),
903    }
904}
905
906fn general_array_has_all_and_any_kernel(
907    haystack_rows: &Rows,
908    needle_rows: &Rows,
909    comparison_type: ComparisonType,
910) -> bool {
911    match comparison_type {
912        ComparisonType::All => needle_rows.iter().all(|needle_row| {
913            haystack_rows
914                .iter()
915                .any(|haystack_row| haystack_row == needle_row)
916        }),
917        ComparisonType::Any => needle_rows.iter().any(|needle_row| {
918            haystack_rows
919                .iter()
920                .any(|haystack_row| haystack_row == needle_row)
921        }),
922    }
923}
924
925#[cfg(test)]
926mod tests {
927    use std::sync::Arc;
928
929    use arrow::datatypes::Int32Type;
930    use arrow::{
931        array::{Array, ArrayRef, AsArray, Int32Array, ListArray, create_array},
932        buffer::OffsetBuffer,
933        datatypes::{DataType, Field},
934    };
935    use datafusion_common::{
936        DataFusionError, ScalarValue, config::ConfigOptions,
937        utils::SingleRowListArrayBuilder,
938    };
939    use datafusion_expr::{
940        ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDFImpl, col, lit,
941        simplify::ExprSimplifyResult,
942    };
943
944    use crate::expr_fn::make_array;
945
946    use super::ArrayHas;
947
948    #[test]
949    fn test_simplify_array_has_to_in_list() {
950        let haystack = lit(SingleRowListArrayBuilder::new(create_array!(
951            Int32,
952            [1, 2, 3]
953        ))
954        .build_list_scalar());
955        let needle = col("c");
956
957        let context = datafusion_expr::simplify::SimplifyContext::default();
958
959        let Ok(ExprSimplifyResult::Simplified(Expr::InList(in_list))) =
960            ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
961        else {
962            panic!("Expected simplified expression");
963        };
964
965        assert_eq!(
966            in_list,
967            datafusion_expr::expr::InList {
968                expr: Box::new(needle),
969                list: vec![lit(1), lit(2), lit(3)],
970                negated: false,
971            }
972        );
973    }
974
975    #[test]
976    fn test_simplify_array_has_with_make_array_to_in_list() {
977        let haystack = make_array(vec![lit(1), lit(2), lit(3)]);
978        let needle = col("c");
979
980        let context = datafusion_expr::simplify::SimplifyContext::default();
981
982        let Ok(ExprSimplifyResult::Simplified(Expr::InList(in_list))) =
983            ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
984        else {
985            panic!("Expected simplified expression");
986        };
987
988        assert_eq!(
989            in_list,
990            datafusion_expr::expr::InList {
991                expr: Box::new(needle),
992                list: vec![lit(1), lit(2), lit(3)],
993                negated: false,
994            }
995        );
996    }
997
998    #[test]
999    fn test_simplify_array_has_with_null_to_null() {
1000        let haystack = Expr::Literal(ScalarValue::Null, None);
1001        let needle = col("c");
1002
1003        let context = datafusion_expr::simplify::SimplifyContext::default();
1004        let Ok(ExprSimplifyResult::Simplified(simplified)) =
1005            ArrayHas::new().simplify(vec![haystack, needle], &context)
1006        else {
1007            panic!("Expected simplified expression");
1008        };
1009
1010        assert_eq!(simplified, Expr::Literal(ScalarValue::Boolean(None), None));
1011    }
1012
1013    #[test]
1014    fn test_simplify_array_has_with_null_list_to_null() {
1015        let haystack =
1016            ListArray::from_iter_primitive::<Int32Type, [Option<i32>; 0], _>([None]);
1017        let haystack = Expr::Literal(ScalarValue::List(Arc::new(haystack)), None);
1018        let needle = col("c");
1019
1020        let context = datafusion_expr::simplify::SimplifyContext::default();
1021        let Ok(ExprSimplifyResult::Simplified(simplified)) =
1022            ArrayHas::new().simplify(vec![haystack, needle], &context)
1023        else {
1024            panic!("Expected simplified expression");
1025        };
1026
1027        assert_eq!(simplified, Expr::Literal(ScalarValue::Boolean(None), None));
1028    }
1029
1030    #[test]
1031    fn test_array_has_complex_list_not_simplified() {
1032        let haystack = col("c1");
1033        let needle = col("c2");
1034
1035        let context = datafusion_expr::simplify::SimplifyContext::default();
1036
1037        let Ok(ExprSimplifyResult::Original(args)) =
1038            ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
1039        else {
1040            panic!("Expected simplified expression");
1041        };
1042
1043        assert_eq!(args, vec![col("c1"), col("c2")],);
1044    }
1045
1046    #[test]
1047    fn test_array_has_list_empty_child() -> Result<(), DataFusionError> {
1048        let haystack_field = Arc::new(Field::new_list(
1049            "haystack",
1050            Field::new_list("", Field::new("", DataType::Int32, true), true),
1051            true,
1052        ));
1053
1054        let needle_field = Arc::new(Field::new("needle", DataType::Int32, true));
1055        let return_field = Arc::new(Field::new("return", DataType::Boolean, true));
1056        let haystack = ListArray::new(
1057            Field::new_list_field(DataType::Int32, true).into(),
1058            OffsetBuffer::new(vec![0, 0].into()),
1059            Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef,
1060            Some(vec![false].into()),
1061        );
1062
1063        let haystack = ColumnarValue::Array(Arc::new(haystack));
1064        let needle = ColumnarValue::Scalar(ScalarValue::Int32(Some(1)));
1065        let result = ArrayHas::new().invoke_with_args(ScalarFunctionArgs {
1066            args: vec![haystack, needle],
1067            arg_fields: vec![haystack_field, needle_field],
1068            number_rows: 1,
1069            return_field,
1070            config_options: Arc::new(ConfigOptions::default()),
1071        })?;
1072
1073        let output = result.into_array(1)?;
1074        let output = output.as_boolean();
1075        assert_eq!(output.len(), 1);
1076        assert!(output.is_null(0));
1077
1078        Ok(())
1079    }
1080
1081    #[test]
1082    fn test_array_has_sliced_list() -> Result<(), DataFusionError> {
1083        // [[10, 20], [30, 40], [50, 60], [70, 80]]  →  slice(1,2)  →  [[30, 40], [50, 60]]
1084        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1085            Some(vec![Some(10), Some(20)]),
1086            Some(vec![Some(30), Some(40)]),
1087            Some(vec![Some(50), Some(60)]),
1088            Some(vec![Some(70), Some(80)]),
1089        ]);
1090        let sliced = list.slice(1, 2);
1091        let haystack_field =
1092            Arc::new(Field::new("haystack", sliced.data_type().clone(), true));
1093        let needle_field = Arc::new(Field::new("needle", DataType::Int32, true));
1094        let return_field = Arc::new(Field::new("return", DataType::Boolean, true));
1095
1096        // Search for elements that exist only in sliced-away rows:
1097        // 10 is in the prefix row, 70 is in the suffix row.
1098        let invoke = |needle: i32| -> Result<ArrayRef, DataFusionError> {
1099            ArrayHas::new()
1100                .invoke_with_args(ScalarFunctionArgs {
1101                    args: vec![
1102                        ColumnarValue::Array(Arc::new(sliced.clone())),
1103                        ColumnarValue::Scalar(ScalarValue::Int32(Some(needle))),
1104                    ],
1105                    arg_fields: vec![
1106                        Arc::clone(&haystack_field),
1107                        Arc::clone(&needle_field),
1108                    ],
1109                    number_rows: 2,
1110                    return_field: Arc::clone(&return_field),
1111                    config_options: Arc::new(ConfigOptions::default()),
1112                })?
1113                .into_array(2)
1114        };
1115
1116        let output = invoke(10)?.as_boolean().clone();
1117        assert!(!output.value(0));
1118        assert!(!output.value(1));
1119
1120        let output = invoke(70)?.as_boolean().clone();
1121        assert!(!output.value(0));
1122        assert!(!output.value(1));
1123
1124        Ok(())
1125    }
1126
1127    #[test]
1128    fn test_array_has_list_null_haystack() -> Result<(), DataFusionError> {
1129        let haystack_field = Arc::new(Field::new("haystack", DataType::Null, true));
1130        let needle_field = Arc::new(Field::new("needle", DataType::Int32, true));
1131        let return_field = Arc::new(Field::new("return", DataType::Boolean, true));
1132        let haystack =
1133            ListArray::from_iter_primitive::<Int32Type, [Option<i32>; 0], _>([
1134                None, None, None,
1135            ]);
1136
1137        let haystack = ColumnarValue::Array(Arc::new(haystack));
1138        let needle = ColumnarValue::Scalar(ScalarValue::Int32(Some(1)));
1139        let result = ArrayHas::new().invoke_with_args(ScalarFunctionArgs {
1140            args: vec![haystack, needle],
1141            arg_fields: vec![haystack_field, needle_field],
1142            number_rows: 1,
1143            return_field,
1144            config_options: Arc::new(ConfigOptions::default()),
1145        })?;
1146
1147        let output = result.into_array(1)?;
1148        let output = output.as_boolean();
1149        assert_eq!(output.len(), 3);
1150        for i in 0..3 {
1151            assert!(output.is_null(i));
1152        }
1153
1154        Ok(())
1155    }
1156}