Skip to main content

datafusion_functions_nested/
array_has.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_has, array_has_all and array_has_any functions.
19
20use arrow::array::{
21    Array, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder, Datum, Scalar,
22    StringArrayType,
23};
24use arrow::buffer::{BooleanBuffer, NullBuffer};
25use arrow::datatypes::DataType;
26use arrow::row::{RowConverter, Rows, SortField};
27use datafusion_common::cast::{as_fixed_size_list_array, as_generic_list_array};
28use datafusion_common::utils::string_utils::string_array_to_vec;
29use datafusion_common::utils::take_function_args;
30use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err};
31use datafusion_expr::expr::ScalarFunction;
32use datafusion_expr::simplify::ExprSimplifyResult;
33use datafusion_expr::{
34    ColumnarValue, Documentation, Expr, ScalarFunctionArgs, ScalarUDFImpl, Signature,
35    Volatility, in_list,
36};
37use datafusion_macros::user_doc;
38use datafusion_physical_expr_common::datum::compare_with_eq;
39use itertools::Itertools;
40
41use crate::make_array::make_array_udf;
42use crate::utils::make_scalar_function;
43
44use hashbrown::HashSet;
45use std::ops::Range;
46use std::sync::Arc;
47
48// Create static instances of ScalarUDFs for each function
49make_udf_expr_and_func!(ArrayHas,
50    array_has,
51    haystack_array element, // arg names
52    "returns true, if the element appears in the first array, otherwise false.", // doc
53    array_has_udf // internal function name
54);
55make_udf_expr_and_func!(ArrayHasAll,
56    array_has_all,
57    haystack_array needle_array, // arg names
58    "returns true if each element of the second array appears in the first array; otherwise, it returns false.", // doc
59    array_has_all_udf // internal function name
60);
61make_udf_expr_and_func!(ArrayHasAny,
62    array_has_any,
63    first_array second_array, // arg names
64    "returns true if at least one element of the second array appears in the first array; otherwise, it returns false.", // doc
65    array_has_any_udf // internal function name
66);
67
68#[user_doc(
69    doc_section(label = "Array Functions"),
70    description = "Returns true if the array contains the element.",
71    syntax_example = "array_has(array, element)",
72    sql_example = r#"```sql
73> select array_has([1, 2, 3], 2);
74+-----------------------------+
75| array_has(List([1,2,3]), 2) |
76+-----------------------------+
77| true                        |
78+-----------------------------+
79```"#,
80    argument(
81        name = "array",
82        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
83    ),
84    argument(
85        name = "element",
86        description = "Scalar or Array expression. Can be a constant, column, or function, and any combination of array operators."
87    )
88)]
89#[derive(Debug, PartialEq, Eq, Hash)]
90pub struct ArrayHas {
91    signature: Signature,
92    aliases: Vec<String>,
93}
94
95impl Default for ArrayHas {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101impl ArrayHas {
102    pub fn new() -> Self {
103        Self {
104            signature: Signature::array_and_element(Volatility::Immutable),
105            aliases: vec![
106                String::from("list_has"),
107                String::from("array_contains"),
108                String::from("list_contains"),
109            ],
110        }
111    }
112}
113
114impl ScalarUDFImpl for ArrayHas {
115    fn name(&self) -> &str {
116        "array_has"
117    }
118
119    fn signature(&self) -> &Signature {
120        &self.signature
121    }
122
123    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
124        Ok(DataType::Boolean)
125    }
126
127    fn simplify(
128        &self,
129        mut args: Vec<Expr>,
130        _info: &datafusion_expr::simplify::SimplifyContext,
131    ) -> Result<ExprSimplifyResult> {
132        let [haystack, needle] = take_function_args(self.name(), &mut args)?;
133
134        // if the haystack is a constant list, we can use an inlist expression which is more
135        // efficient because the haystack is not varying per-row
136        match haystack {
137            Expr::Literal(scalar, _) if scalar.is_null() => {
138                return Ok(ExprSimplifyResult::Simplified(Expr::Literal(
139                    ScalarValue::Boolean(None),
140                    None,
141                )));
142            }
143            Expr::Literal(
144                // FixedSizeList gets coerced to List
145                scalar @ ScalarValue::List(_) | scalar @ ScalarValue::LargeList(_),
146                _,
147            ) => {
148                if let Ok(scalar_values) =
149                    ScalarValue::convert_array_to_scalar_vec(&scalar.to_array()?)
150                {
151                    assert_eq!(scalar_values.len(), 1);
152                    let list = scalar_values
153                        .into_iter()
154                        .flatten()
155                        .flatten()
156                        .map(|v| Expr::Literal(v, None))
157                        .collect();
158
159                    return Ok(ExprSimplifyResult::Simplified(in_list(
160                        std::mem::take(needle),
161                        list,
162                        false,
163                    )));
164                }
165            }
166            Expr::ScalarFunction(ScalarFunction { func, args })
167                if func == &make_array_udf() =>
168            {
169                // make_array has a static set of arguments, so we can pull the arguments out from it
170                return Ok(ExprSimplifyResult::Simplified(in_list(
171                    std::mem::take(needle),
172                    std::mem::take(args),
173                    false,
174                )));
175            }
176            _ => {}
177        };
178        Ok(ExprSimplifyResult::Original(args))
179    }
180
181    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
182        let [first_arg, second_arg] = take_function_args(self.name(), &args.args)?;
183        if first_arg.data_type().is_null() {
184            // Always return null if the first argument is null
185            // i.e. array_has(null, element) -> null
186            return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
187        }
188
189        match &second_arg {
190            ColumnarValue::Array(array_needle) => {
191                // the needle is already an array, convert the haystack to an array of the same length
192                let haystack = first_arg.to_array(array_needle.len())?;
193                let array = array_has_inner_for_array(&haystack, array_needle)?;
194                Ok(ColumnarValue::Array(array))
195            }
196            ColumnarValue::Scalar(scalar_needle) => {
197                // Always return null if the second argument is null
198                // i.e. array_has(array, null) -> null
199                if scalar_needle.is_null() {
200                    return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
201                }
202
203                // since the needle is a scalar, convert it to an array of size 1
204                let haystack = first_arg.to_array(1)?;
205                let needle = scalar_needle.to_array_of_size(1)?;
206                let needle = Scalar::new(needle);
207                let array = array_has_inner_for_scalar(&haystack, &needle)?;
208                if let ColumnarValue::Scalar(_) = &first_arg {
209                    // If both inputs are scalar, keeps output as scalar
210                    let scalar_value = ScalarValue::try_from_array(&array, 0)?;
211                    Ok(ColumnarValue::Scalar(scalar_value))
212                } else {
213                    Ok(ColumnarValue::Array(array))
214                }
215            }
216        }
217    }
218
219    fn aliases(&self) -> &[String] {
220        &self.aliases
221    }
222
223    fn documentation(&self) -> Option<&Documentation> {
224        self.doc()
225    }
226}
227
228fn array_has_inner_for_scalar(
229    haystack: &ArrayRef,
230    needle: &dyn Datum,
231) -> Result<ArrayRef> {
232    let haystack = haystack.as_ref().try_into()?;
233    array_has_dispatch_for_scalar(haystack, needle)
234}
235
236fn array_has_inner_for_array(haystack: &ArrayRef, needle: &ArrayRef) -> Result<ArrayRef> {
237    let haystack = haystack.as_ref().try_into()?;
238    array_has_dispatch_for_array(haystack, needle)
239}
240
241#[derive(Copy, Clone)]
242enum ArrayWrapper<'a> {
243    FixedSizeList(&'a arrow::array::FixedSizeListArray),
244    List(&'a arrow::array::GenericListArray<i32>),
245    LargeList(&'a arrow::array::GenericListArray<i64>),
246}
247
248impl<'a> TryFrom<&'a dyn Array> for ArrayWrapper<'a> {
249    type Error = DataFusionError;
250
251    fn try_from(
252        value: &'a dyn Array,
253    ) -> std::result::Result<ArrayWrapper<'a>, Self::Error> {
254        match value.data_type() {
255            DataType::List(_) => {
256                Ok(ArrayWrapper::List(as_generic_list_array::<i32>(value)?))
257            }
258            DataType::LargeList(_) => Ok(ArrayWrapper::LargeList(
259                as_generic_list_array::<i64>(value)?,
260            )),
261            DataType::FixedSizeList(_, _) => Ok(ArrayWrapper::FixedSizeList(
262                as_fixed_size_list_array(value)?,
263            )),
264            _ => exec_err!("array_has does not support type '{}'.", value.data_type()),
265        }
266    }
267}
268
269impl<'a> ArrayWrapper<'a> {
270    fn len(&self) -> usize {
271        match self {
272            ArrayWrapper::FixedSizeList(arr) => arr.len(),
273            ArrayWrapper::List(arr) => arr.len(),
274            ArrayWrapper::LargeList(arr) => arr.len(),
275        }
276    }
277
278    fn iter(&self) -> Box<dyn Iterator<Item = Option<ArrayRef>> + 'a> {
279        match self {
280            ArrayWrapper::FixedSizeList(arr) => Box::new(arr.iter()),
281            ArrayWrapper::List(arr) => Box::new(arr.iter()),
282            ArrayWrapper::LargeList(arr) => Box::new(arr.iter()),
283        }
284    }
285
286    fn values(&self) -> &ArrayRef {
287        match self {
288            ArrayWrapper::FixedSizeList(arr) => arr.values(),
289            ArrayWrapper::List(arr) => arr.values(),
290            ArrayWrapper::LargeList(arr) => arr.values(),
291        }
292    }
293
294    fn value_type(&self) -> DataType {
295        match self {
296            ArrayWrapper::FixedSizeList(arr) => arr.value_type(),
297            ArrayWrapper::List(arr) => arr.value_type(),
298            ArrayWrapper::LargeList(arr) => arr.value_type(),
299        }
300    }
301
302    fn offsets(&self) -> Box<dyn Iterator<Item = usize> + 'a> {
303        match self {
304            ArrayWrapper::FixedSizeList(arr) => {
305                let value_length = arr.value_length() as usize;
306                Box::new((0..=arr.len()).map(move |i| i * value_length))
307            }
308            ArrayWrapper::List(arr) => {
309                Box::new(arr.offsets().iter().map(|o| (*o) as usize))
310            }
311            ArrayWrapper::LargeList(arr) => {
312                Box::new(arr.offsets().iter().map(|o| (*o) as usize))
313            }
314        }
315    }
316
317    fn nulls(&self) -> Option<&NullBuffer> {
318        match self {
319            ArrayWrapper::FixedSizeList(arr) => arr.nulls(),
320            ArrayWrapper::List(arr) => arr.nulls(),
321            ArrayWrapper::LargeList(arr) => arr.nulls(),
322        }
323    }
324}
325
326fn array_has_dispatch_for_array<'a>(
327    haystack: ArrayWrapper<'a>,
328    needle: &ArrayRef,
329) -> Result<ArrayRef> {
330    let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
331    let mut result = BooleanBufferBuilder::new(haystack.len());
332    for (i, arr) in haystack.iter().enumerate() {
333        if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
334            result.append(false);
335            continue;
336        }
337        let arr = arr.unwrap();
338        let is_nested = arr.data_type().is_nested();
339        let needle_row = Scalar::new(needle.slice(i, 1));
340        let eq_array = compare_with_eq(&arr, &needle_row, is_nested)?;
341        result.append(eq_array.has_true());
342    }
343
344    Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
345}
346
347fn array_has_dispatch_for_scalar(
348    haystack: ArrayWrapper<'_>,
349    needle: &dyn Datum,
350) -> Result<ArrayRef> {
351    // If first argument is empty list (second argument is non-null), return false
352    // i.e. array_has([], non-null element) -> false
353    if haystack.len() == 0 {
354        return Ok(Arc::new(BooleanArray::new(
355            BooleanBuffer::new_unset(haystack.len()),
356            None,
357        )));
358    }
359
360    // For sliced ListArrays, values() returns the full underlying array but
361    // only elements between the first and last offset are visible.
362    let offsets: Vec<usize> = haystack.offsets().collect();
363    let first_offset = offsets[0];
364    let visible_values = haystack
365        .values()
366        .slice(first_offset, offsets[offsets.len() - 1] - first_offset);
367
368    let is_nested = visible_values.data_type().is_nested();
369    let eq_array = compare_with_eq(&visible_values, needle, is_nested)?;
370
371    // When a haystack element is null, `eq()` returns null (not false).
372    // In Arrow, a null BooleanArray entry has validity=0 but an
373    // undefined value bit that may happen to be 1. Since set_indices()
374    // operates on the raw value buffer and ignores validity, we AND the
375    // values with the validity bitmap to clear any undefined bits at
376    // null positions. This ensures set_indices() only yields positions
377    // where the comparison genuinely returned true.
378    let eq_bits = match eq_array.nulls() {
379        Some(nulls) => eq_array.values() & nulls.inner(),
380        None => eq_array.values().clone(),
381    };
382
383    let validity = match &haystack {
384        ArrayWrapper::FixedSizeList(arr) => arr.nulls(),
385        ArrayWrapper::List(arr) => arr.nulls(),
386        ArrayWrapper::LargeList(arr) => arr.nulls(),
387    };
388    let mut matches = eq_bits.set_indices().peekable();
389    let mut result = BooleanBufferBuilder::new(haystack.len());
390    result.append_n(haystack.len(), false);
391
392    // Match positions are relative to visible_values (0-based), so
393    // subtract first_offset from each offset when comparing.
394    for (i, window) in offsets.windows(2).enumerate() {
395        let end = window[1] - first_offset;
396
397        let has_match = matches.peek().is_some_and(|&p| p < end);
398
399        // Advance past all match positions in this row's range.
400        while matches.peek().is_some_and(|&p| p < end) {
401            matches.next();
402        }
403
404        if has_match && validity.is_none_or(|v| v.is_valid(i)) {
405            result.set_bit(i, true);
406        }
407    }
408
409    // A null haystack row always produces a null output, so we can
410    // reuse the haystack's null buffer directly.
411    Ok(Arc::new(BooleanArray::new(
412        result.finish(),
413        validity.cloned(),
414    )))
415}
416
417fn array_has_all_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
418    array_has_all_and_any_inner(args, ComparisonType::All)
419}
420
421/// Number of rows to process at a time when doing batched row conversion.  This
422/// amortizes the row conversion overhead over more rows, but making this too
423/// large can cause cache pressure for large arrays. See
424/// <https://github.com/apache/datafusion/pull/20588> for context.
425const ROW_CONVERSION_CHUNK_SIZE: usize = 512;
426
427// General row comparison for array_has_all and array_has_any
428fn general_array_has_for_all_and_any<'a>(
429    haystack: ArrayWrapper<'a>,
430    needle: ArrayWrapper<'a>,
431    comparison_type: ComparisonType,
432) -> Result<ArrayRef> {
433    let num_rows = haystack.len();
434    let converter = RowConverter::new(vec![SortField::new(haystack.value_type())])?;
435
436    let h_offsets: Vec<usize> = haystack.offsets().collect();
437    let n_offsets: Vec<usize> = needle.offsets().collect();
438
439    let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
440    let mut result = BooleanBufferBuilder::new(num_rows);
441
442    for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
443        let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
444
445        // For efficiency with sliced arrays, only process the visible elements,
446        // not the entire underlying buffer.
447        let h_elem_start = h_offsets[chunk_start];
448        let h_elem_end = h_offsets[chunk_end];
449        let n_elem_start = n_offsets[chunk_start];
450        let n_elem_end = n_offsets[chunk_end];
451
452        let h_vals = haystack
453            .values()
454            .slice(h_elem_start, h_elem_end - h_elem_start);
455        let n_vals = needle
456            .values()
457            .slice(n_elem_start, n_elem_end - n_elem_start);
458
459        let chunk_h_rows = converter.convert_columns(&[h_vals])?;
460        let chunk_n_rows = converter.convert_columns(&[n_vals])?;
461
462        for i in chunk_start..chunk_end {
463            if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
464                result.append(false);
465                continue;
466            }
467            result.append(general_array_has_all_and_any_kernel(
468                &chunk_h_rows,
469                (h_offsets[i] - h_elem_start)..(h_offsets[i + 1] - h_elem_start),
470                &chunk_n_rows,
471                (n_offsets[i] - n_elem_start)..(n_offsets[i + 1] - n_elem_start),
472                comparison_type,
473            ));
474        }
475    }
476
477    Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
478}
479
480// String comparison for array_has_all and array_has_any
481fn array_has_all_and_any_string_internal<'a>(
482    haystack: ArrayWrapper<'a>,
483    needle: ArrayWrapper<'a>,
484    comparison_type: ComparisonType,
485) -> Result<ArrayRef> {
486    let num_rows = haystack.len();
487
488    let h_offsets: Vec<usize> = haystack.offsets().collect();
489    let n_offsets: Vec<usize> = needle.offsets().collect();
490
491    let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
492    let mut result = BooleanBufferBuilder::new(num_rows);
493
494    for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
495        let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
496
497        let h_elem_start = h_offsets[chunk_start];
498        let h_elem_end = h_offsets[chunk_end];
499        let n_elem_start = n_offsets[chunk_start];
500        let n_elem_end = n_offsets[chunk_end];
501
502        let h_vals = haystack
503            .values()
504            .slice(h_elem_start, h_elem_end - h_elem_start);
505        let n_vals = needle
506            .values()
507            .slice(n_elem_start, n_elem_end - n_elem_start);
508
509        let chunk_h_strings = string_array_to_vec(h_vals.as_ref());
510        let chunk_n_strings = string_array_to_vec(n_vals.as_ref());
511
512        for i in chunk_start..chunk_end {
513            if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
514                result.append(false);
515                continue;
516            }
517            let h_start = h_offsets[i] - h_elem_start;
518            let h_end = h_offsets[i + 1] - h_elem_start;
519            let n_start = n_offsets[i] - n_elem_start;
520            let n_end = n_offsets[i + 1] - n_elem_start;
521            result.append(array_has_string_kernel(
522                &chunk_h_strings[h_start..h_end],
523                &chunk_n_strings[n_start..n_end],
524                comparison_type,
525            ));
526        }
527    }
528
529    Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
530}
531
532fn array_has_all_and_any_dispatch<'a>(
533    haystack: ArrayWrapper<'a>,
534    needle: ArrayWrapper<'a>,
535    comparison_type: ComparisonType,
536) -> Result<ArrayRef> {
537    if needle.values().is_empty() {
538        let buffer = match comparison_type {
539            ComparisonType::All => BooleanBuffer::new_set(haystack.len()),
540            ComparisonType::Any => BooleanBuffer::new_unset(haystack.len()),
541        };
542        Ok(Arc::new(BooleanArray::from(buffer)))
543    } else {
544        match needle.value_type() {
545            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
546                array_has_all_and_any_string_internal(haystack, needle, comparison_type)
547            }
548            _ => general_array_has_for_all_and_any(haystack, needle, comparison_type),
549        }
550    }
551}
552
553fn array_has_all_and_any_inner(
554    args: &[ArrayRef],
555    comparison_type: ComparisonType,
556) -> Result<ArrayRef> {
557    let haystack: ArrayWrapper = args[0].as_ref().try_into()?;
558    let needle: ArrayWrapper = args[1].as_ref().try_into()?;
559    array_has_all_and_any_dispatch(haystack, needle, comparison_type)
560}
561
562fn array_has_any_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
563    array_has_all_and_any_inner(args, ComparisonType::Any)
564}
565
566/// Fast path for `array_has_any` when exactly one argument is a scalar.
567fn array_has_any_with_scalar(
568    columnar_arg: &ColumnarValue,
569    scalar_arg: &ScalarValue,
570) -> Result<ColumnarValue> {
571    if scalar_arg.is_null() {
572        return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
573    }
574
575    // Convert the scalar to a 1-element ListArray, then extract the inner values
576    let scalar_array = scalar_arg.to_array_of_size(1)?;
577    let scalar_list: ArrayWrapper = scalar_array.as_ref().try_into()?;
578    let offsets: Vec<usize> = scalar_list.offsets().collect();
579    let scalar_values = scalar_list
580        .values()
581        .slice(offsets[0], offsets[1] - offsets[0]);
582
583    // If scalar list is empty, result is always false
584    if scalar_values.is_empty() {
585        return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))));
586    }
587
588    match scalar_values.data_type() {
589        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
590            array_has_any_with_scalar_string(columnar_arg, &scalar_values)
591        }
592        _ => array_has_any_with_scalar_general(columnar_arg, &scalar_values),
593    }
594}
595
596/// When the scalar argument has more elements than this, the scalar fast path
597/// builds a HashSet for O(1) lookups. At or below this threshold, it falls
598/// back to a linear scan, since hashing every columnar element is more
599/// expensive than a linear scan over a short array.
600const SCALAR_SMALL_THRESHOLD: usize = 8;
601
602/// String-specialized scalar fast path for `array_has_any`.
603fn array_has_any_with_scalar_string(
604    columnar_arg: &ColumnarValue,
605    scalar_values: &ArrayRef,
606) -> Result<ColumnarValue> {
607    let (col_arr, is_scalar_output) = match columnar_arg {
608        ColumnarValue::Array(arr) => (Arc::clone(arr), false),
609        ColumnarValue::Scalar(s) => (s.to_array_of_size(1)?, true),
610    };
611
612    let col_list: ArrayWrapper = col_arr.as_ref().try_into()?;
613    let col_values = col_list.values();
614    let col_offsets: Vec<usize> = col_list.offsets().collect();
615    let col_nulls = col_list.nulls();
616
617    let scalar_lookup = ScalarStringLookup::new(scalar_values);
618    let has_null_scalar = scalar_values.null_count() > 0;
619
620    let result = match col_values.data_type() {
621        DataType::Utf8 => array_has_any_string_inner(
622            col_values.as_string::<i32>(),
623            &col_offsets,
624            col_nulls,
625            has_null_scalar,
626            &scalar_lookup,
627        ),
628        DataType::LargeUtf8 => array_has_any_string_inner(
629            col_values.as_string::<i64>(),
630            &col_offsets,
631            col_nulls,
632            has_null_scalar,
633            &scalar_lookup,
634        ),
635        DataType::Utf8View => array_has_any_string_inner(
636            col_values.as_string_view(),
637            &col_offsets,
638            col_nulls,
639            has_null_scalar,
640            &scalar_lookup,
641        ),
642        _ => unreachable!("array_has_any_with_scalar_string called with non-string type"),
643    };
644
645    if is_scalar_output {
646        Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
647            &result, 0,
648        )?))
649    } else {
650        Ok(ColumnarValue::Array(result))
651    }
652}
653
654/// Pre-computed lookup structure for the scalar string fastpath.
655enum ScalarStringLookup<'a> {
656    /// Large scalar: HashSet for O(1) lookups.
657    Set(HashSet<&'a str>),
658    /// Small scalar: Vec for linear scan.
659    List(Vec<Option<&'a str>>),
660}
661
662impl<'a> ScalarStringLookup<'a> {
663    fn new(scalar_values: &'a ArrayRef) -> Self {
664        let strings = string_array_to_vec(scalar_values.as_ref());
665        if strings.len() > SCALAR_SMALL_THRESHOLD {
666            ScalarStringLookup::Set(strings.into_iter().flatten().collect())
667        } else {
668            ScalarStringLookup::List(strings)
669        }
670    }
671
672    fn contains(&self, value: &str) -> bool {
673        match self {
674            ScalarStringLookup::Set(set) => set.contains(value),
675            ScalarStringLookup::List(list) => list.contains(&Some(value)),
676        }
677    }
678}
679
680/// Inner implementation of the string scalar fast path, generic over string
681/// array type to allow direct element access by index.
682fn array_has_any_string_inner<'a, C: StringArrayType<'a> + Copy>(
683    col_strings: C,
684    col_offsets: &[usize],
685    col_nulls: Option<&NullBuffer>,
686    has_null_scalar: bool,
687    scalar_lookup: &ScalarStringLookup<'_>,
688) -> ArrayRef {
689    let num_rows = col_offsets.len() - 1;
690    let mut result = BooleanBufferBuilder::new(num_rows);
691
692    for i in 0..num_rows {
693        if col_nulls.is_some_and(|v| v.is_null(i)) {
694            result.append(false);
695            continue;
696        }
697        let start = col_offsets[i];
698        let end = col_offsets[i + 1];
699        let found = (start..end).any(|j| {
700            if col_strings.is_null(j) {
701                has_null_scalar
702            } else {
703                scalar_lookup.contains(col_strings.value(j))
704            }
705        });
706        result.append(found);
707    }
708
709    Arc::new(BooleanArray::new(result.finish(), col_nulls.cloned()))
710}
711
712/// General scalar fast path for `array_has_any`, using RowConverter for
713/// type-erased comparison.
714fn array_has_any_with_scalar_general(
715    columnar_arg: &ColumnarValue,
716    scalar_values: &ArrayRef,
717) -> Result<ColumnarValue> {
718    let converter =
719        RowConverter::new(vec![SortField::new(scalar_values.data_type().clone())])?;
720    let scalar_rows = converter.convert_columns(&[Arc::clone(scalar_values)])?;
721
722    let (col_arr, is_scalar_output) = match columnar_arg {
723        ColumnarValue::Array(arr) => (Arc::clone(arr), false),
724        ColumnarValue::Scalar(s) => (s.to_array_of_size(1)?, true),
725    };
726
727    let col_list: ArrayWrapper = col_arr.as_ref().try_into()?;
728    let col_rows = converter.convert_columns(&[Arc::clone(col_list.values())])?;
729    let col_offsets: Vec<usize> = col_list.offsets().collect();
730    let col_nulls = col_list.nulls();
731
732    let mut result = BooleanBufferBuilder::new(col_list.len());
733    let num_scalar = scalar_rows.num_rows();
734
735    if num_scalar > SCALAR_SMALL_THRESHOLD {
736        // Large scalar: build HashSet for O(1) lookups
737        let scalar_set: HashSet<Box<[u8]>> = (0..num_scalar)
738            .map(|i| Box::from(scalar_rows.row(i).as_ref()))
739            .collect();
740
741        for i in 0..col_list.len() {
742            if col_nulls.is_some_and(|v| v.is_null(i)) {
743                result.append(false);
744                continue;
745            }
746            let start = col_offsets[i];
747            let end = col_offsets[i + 1];
748            let found =
749                (start..end).any(|j| scalar_set.contains(col_rows.row(j).as_ref()));
750            result.append(found);
751        }
752    } else {
753        // Small scalar: linear scan avoids HashSet hashing overhead
754        for i in 0..col_list.len() {
755            if col_nulls.is_some_and(|v| v.is_null(i)) {
756                result.append(false);
757                continue;
758            }
759            let start = col_offsets[i];
760            let end = col_offsets[i + 1];
761            let found = (start..end)
762                .any(|j| (0..num_scalar).any(|k| col_rows.row(j) == scalar_rows.row(k)));
763            result.append(found);
764        }
765    }
766
767    let output: ArrayRef =
768        Arc::new(BooleanArray::new(result.finish(), col_nulls.cloned()));
769
770    if is_scalar_output {
771        Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
772            &output, 0,
773        )?))
774    } else {
775        Ok(ColumnarValue::Array(output))
776    }
777}
778
779#[user_doc(
780    doc_section(label = "Array Functions"),
781    description = "Returns true if all elements of sub-array exist in array.",
782    syntax_example = "array_has_all(array, sub-array)",
783    sql_example = r#"```sql
784> select array_has_all([1, 2, 3, 4], [2, 3]);
785+--------------------------------------------+
786| array_has_all(List([1,2,3,4]), List([2,3])) |
787+--------------------------------------------+
788| true                                       |
789+--------------------------------------------+
790```"#,
791    argument(
792        name = "array",
793        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
794    ),
795    argument(
796        name = "sub-array",
797        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
798    )
799)]
800#[derive(Debug, PartialEq, Eq, Hash)]
801pub struct ArrayHasAll {
802    signature: Signature,
803    aliases: Vec<String>,
804}
805
806impl Default for ArrayHasAll {
807    fn default() -> Self {
808        Self::new()
809    }
810}
811
812impl ArrayHasAll {
813    pub fn new() -> Self {
814        Self {
815            signature: Signature::arrays(2, None, Volatility::Immutable),
816            aliases: vec![String::from("list_has_all")],
817        }
818    }
819}
820
821impl ScalarUDFImpl for ArrayHasAll {
822    fn name(&self) -> &str {
823        "array_has_all"
824    }
825
826    fn signature(&self) -> &Signature {
827        &self.signature
828    }
829
830    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
831        Ok(DataType::Boolean)
832    }
833
834    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
835        make_scalar_function(array_has_all_inner)(&args.args)
836    }
837
838    fn aliases(&self) -> &[String] {
839        &self.aliases
840    }
841
842    fn documentation(&self) -> Option<&Documentation> {
843        self.doc()
844    }
845}
846
847#[user_doc(
848    doc_section(label = "Array Functions"),
849    description = "Returns true if the arrays have any elements in common.",
850    syntax_example = "array_has_any(array1, array2)",
851    sql_example = r#"```sql
852> select array_has_any([1, 2, 3], [3, 4]);
853+------------------------------------------+
854| array_has_any(List([1,2,3]), List([3,4])) |
855+------------------------------------------+
856| true                                     |
857+------------------------------------------+
858```"#,
859    argument(
860        name = "array1",
861        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
862    ),
863    argument(
864        name = "array2",
865        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
866    )
867)]
868#[derive(Debug, PartialEq, Eq, Hash)]
869pub struct ArrayHasAny {
870    signature: Signature,
871    aliases: Vec<String>,
872}
873
874impl Default for ArrayHasAny {
875    fn default() -> Self {
876        Self::new()
877    }
878}
879
880impl ArrayHasAny {
881    pub fn new() -> Self {
882        Self {
883            signature: Signature::arrays(2, None, Volatility::Immutable),
884            aliases: vec![String::from("list_has_any"), String::from("arrays_overlap")],
885        }
886    }
887}
888
889impl ScalarUDFImpl for ArrayHasAny {
890    fn name(&self) -> &str {
891        "array_has_any"
892    }
893
894    fn signature(&self) -> &Signature {
895        &self.signature
896    }
897
898    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
899        Ok(DataType::Boolean)
900    }
901
902    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
903        let [first_arg, second_arg] = take_function_args(self.name(), &args.args)?;
904
905        // If either argument is scalar, use the fast path.
906        match (&first_arg, &second_arg) {
907            (cv, ColumnarValue::Scalar(scalar)) | (ColumnarValue::Scalar(scalar), cv) => {
908                array_has_any_with_scalar(cv, scalar)
909            }
910            _ => make_scalar_function(array_has_any_inner)(&args.args),
911        }
912    }
913
914    fn aliases(&self) -> &[String] {
915        &self.aliases
916    }
917
918    fn documentation(&self) -> Option<&Documentation> {
919        self.doc()
920    }
921}
922
923/// Represents the type of comparison for array_has.
924#[derive(Debug, PartialEq, Clone, Copy)]
925enum ComparisonType {
926    // array_has_all
927    All,
928    // array_has_any
929    Any,
930}
931
932fn array_has_string_kernel(
933    haystack: &[Option<&str>],
934    needle: &[Option<&str>],
935    comparison_type: ComparisonType,
936) -> bool {
937    match comparison_type {
938        ComparisonType::All => needle
939            .iter()
940            .dedup()
941            .all(|x| haystack.iter().dedup().any(|y| y == x)),
942        ComparisonType::Any => needle
943            .iter()
944            .dedup()
945            .any(|x| haystack.iter().dedup().any(|y| y == x)),
946    }
947}
948
949fn general_array_has_all_and_any_kernel(
950    haystack_rows: &Rows,
951    h_range: Range<usize>,
952    needle_rows: &Rows,
953    mut n_range: Range<usize>,
954    comparison_type: ComparisonType,
955) -> bool {
956    let h_start = h_range.start;
957    let h_end = h_range.end;
958
959    match comparison_type {
960        ComparisonType::All => n_range.all(|ni| {
961            let needle_row = needle_rows.row(ni);
962            (h_start..h_end).any(|hi| haystack_rows.row(hi) == needle_row)
963        }),
964        ComparisonType::Any => n_range.any(|ni| {
965            let needle_row = needle_rows.row(ni);
966            (h_start..h_end).any(|hi| haystack_rows.row(hi) == needle_row)
967        }),
968    }
969}
970
971#[cfg(test)]
972mod tests {
973    use std::sync::Arc;
974
975    use arrow::datatypes::Int32Type;
976    use arrow::{
977        array::{
978            Array, ArrayRef, AsArray, FixedSizeListArray, Int32Array, ListArray,
979            create_array,
980        },
981        buffer::OffsetBuffer,
982        datatypes::{DataType, Field},
983    };
984    use datafusion_common::{
985        DataFusionError, ScalarValue, config::ConfigOptions,
986        utils::SingleRowListArrayBuilder,
987    };
988    use datafusion_expr::simplify::SimplifyContext;
989    use datafusion_expr::{
990        ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDFImpl, col, lit,
991        simplify::ExprSimplifyResult,
992    };
993
994    use crate::expr_fn::make_array;
995
996    use super::{ArrayHas, ArrayHasAll, ArrayHasAny};
997
998    #[test]
999    fn test_simplify_array_has_to_in_list() {
1000        let haystack = lit(SingleRowListArrayBuilder::new(create_array!(
1001            Int32,
1002            [1, 2, 3]
1003        ))
1004        .build_list_scalar());
1005        let needle = col("c");
1006
1007        let context = SimplifyContext::default();
1008
1009        let Ok(ExprSimplifyResult::Simplified(Expr::InList(in_list))) =
1010            ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
1011        else {
1012            panic!("Expected simplified expression");
1013        };
1014
1015        assert_eq!(
1016            in_list,
1017            datafusion_expr::expr::InList {
1018                expr: Box::new(needle),
1019                list: vec![lit(1), lit(2), lit(3)],
1020                negated: false,
1021            }
1022        );
1023    }
1024
1025    #[test]
1026    fn test_simplify_array_has_with_make_array_to_in_list() {
1027        let haystack = make_array(vec![lit(1), lit(2), lit(3)]);
1028        let needle = col("c");
1029
1030        let context = SimplifyContext::default();
1031
1032        let Ok(ExprSimplifyResult::Simplified(Expr::InList(in_list))) =
1033            ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
1034        else {
1035            panic!("Expected simplified expression");
1036        };
1037
1038        assert_eq!(
1039            in_list,
1040            datafusion_expr::expr::InList {
1041                expr: Box::new(needle),
1042                list: vec![lit(1), lit(2), lit(3)],
1043                negated: false,
1044            }
1045        );
1046    }
1047
1048    #[test]
1049    fn test_simplify_array_has_with_null_to_null() {
1050        let haystack = Expr::Literal(ScalarValue::Null, None);
1051        let needle = col("c");
1052
1053        let context = SimplifyContext::default();
1054        let Ok(ExprSimplifyResult::Simplified(simplified)) =
1055            ArrayHas::new().simplify(vec![haystack, needle], &context)
1056        else {
1057            panic!("Expected simplified expression");
1058        };
1059
1060        assert_eq!(simplified, Expr::Literal(ScalarValue::Boolean(None), None));
1061    }
1062
1063    #[test]
1064    fn test_simplify_array_has_with_null_list_to_null() {
1065        let haystack =
1066            ListArray::from_iter_primitive::<Int32Type, [Option<i32>; 0], _>([None]);
1067        let haystack = Expr::Literal(ScalarValue::List(Arc::new(haystack)), None);
1068        let needle = col("c");
1069
1070        let context = SimplifyContext::default();
1071        let Ok(ExprSimplifyResult::Simplified(simplified)) =
1072            ArrayHas::new().simplify(vec![haystack, needle], &context)
1073        else {
1074            panic!("Expected simplified expression");
1075        };
1076
1077        assert_eq!(simplified, Expr::Literal(ScalarValue::Boolean(None), None));
1078    }
1079
1080    #[test]
1081    fn test_array_has_complex_list_not_simplified() {
1082        let haystack = col("c1");
1083        let needle = col("c2");
1084
1085        let context = SimplifyContext::default();
1086
1087        let Ok(ExprSimplifyResult::Original(args)) =
1088            ArrayHas::new().simplify(vec![haystack, needle.clone()], &context)
1089        else {
1090            panic!("Expected simplified expression");
1091        };
1092
1093        assert_eq!(args, vec![col("c1"), col("c2")],);
1094    }
1095
1096    #[test]
1097    fn test_array_has_list_empty_child() -> Result<(), DataFusionError> {
1098        let haystack_field = Arc::new(Field::new_list(
1099            "haystack",
1100            Field::new_list("", Field::new("", DataType::Int32, true), true),
1101            true,
1102        ));
1103
1104        let needle_field = Arc::new(Field::new("needle", DataType::Int32, true));
1105        let return_field = Arc::new(Field::new("return", DataType::Boolean, true));
1106        let haystack = ListArray::new(
1107            Field::new_list_field(DataType::Int32, true).into(),
1108            OffsetBuffer::new(vec![0, 0].into()),
1109            Arc::new(Int32Array::from(Vec::<i32>::new())) as ArrayRef,
1110            Some(vec![false].into()),
1111        );
1112
1113        let haystack = ColumnarValue::Array(Arc::new(haystack));
1114        let needle = ColumnarValue::Scalar(ScalarValue::Int32(Some(1)));
1115        let result = ArrayHas::new().invoke_with_args(ScalarFunctionArgs {
1116            args: vec![haystack, needle],
1117            arg_fields: vec![haystack_field, needle_field],
1118            number_rows: 1,
1119            return_field,
1120            config_options: Arc::new(ConfigOptions::default()),
1121        })?;
1122
1123        let output = result.into_array(1)?;
1124        let output = output.as_boolean();
1125        assert_eq!(output.len(), 1);
1126        assert!(output.is_null(0));
1127
1128        Ok(())
1129    }
1130
1131    #[test]
1132    fn test_array_has_sliced_list() -> Result<(), DataFusionError> {
1133        // [[10, 20], [30, 40], [50, 60], [70, 80]]  →  slice(1,2)  →  [[30, 40], [50, 60]]
1134        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1135            Some(vec![Some(10), Some(20)]),
1136            Some(vec![Some(30), Some(40)]),
1137            Some(vec![Some(50), Some(60)]),
1138            Some(vec![Some(70), Some(80)]),
1139        ]);
1140        let sliced = list.slice(1, 2);
1141        let haystack_field =
1142            Arc::new(Field::new("haystack", sliced.data_type().clone(), true));
1143        let needle_field = Arc::new(Field::new("needle", DataType::Int32, true));
1144        let return_field = Arc::new(Field::new("return", DataType::Boolean, true));
1145
1146        // Search for elements that exist only in sliced-away rows:
1147        // 10 is in the prefix row, 70 is in the suffix row.
1148        let invoke = |needle: i32| -> Result<ArrayRef, DataFusionError> {
1149            ArrayHas::new()
1150                .invoke_with_args(ScalarFunctionArgs {
1151                    args: vec![
1152                        ColumnarValue::Array(Arc::new(sliced.clone())),
1153                        ColumnarValue::Scalar(ScalarValue::Int32(Some(needle))),
1154                    ],
1155                    arg_fields: vec![
1156                        Arc::clone(&haystack_field),
1157                        Arc::clone(&needle_field),
1158                    ],
1159                    number_rows: 2,
1160                    return_field: Arc::clone(&return_field),
1161                    config_options: Arc::new(ConfigOptions::default()),
1162                })?
1163                .into_array(2)
1164        };
1165
1166        let output = invoke(10)?.as_boolean().clone();
1167        assert!(!output.value(0));
1168        assert!(!output.value(1));
1169
1170        let output = invoke(70)?.as_boolean().clone();
1171        assert!(!output.value(0));
1172        assert!(!output.value(1));
1173
1174        Ok(())
1175    }
1176
1177    #[test]
1178    fn test_array_has_list_null_haystack() -> Result<(), DataFusionError> {
1179        let haystack_field = Arc::new(Field::new("haystack", DataType::Null, true));
1180        let needle_field = Arc::new(Field::new("needle", DataType::Int32, true));
1181        let return_field = Arc::new(Field::new("return", DataType::Boolean, true));
1182        let haystack =
1183            ListArray::from_iter_primitive::<Int32Type, [Option<i32>; 0], _>([
1184                None, None, None,
1185            ]);
1186
1187        let haystack = ColumnarValue::Array(Arc::new(haystack));
1188        let needle = ColumnarValue::Scalar(ScalarValue::Int32(Some(1)));
1189        let result = ArrayHas::new().invoke_with_args(ScalarFunctionArgs {
1190            args: vec![haystack, needle],
1191            arg_fields: vec![haystack_field, needle_field],
1192            number_rows: 1,
1193            return_field,
1194            config_options: Arc::new(ConfigOptions::default()),
1195        })?;
1196
1197        let output = result.into_array(1)?;
1198        let output = output.as_boolean();
1199        assert_eq!(output.len(), 3);
1200        for i in 0..3 {
1201            assert!(output.is_null(i));
1202        }
1203
1204        Ok(())
1205    }
1206
1207    /// Invoke a two-argument list UDF with the given arrays and assert the
1208    /// boolean output matches `expected`.
1209    fn invoke_and_assert(
1210        udf: &dyn ScalarUDFImpl,
1211        haystack: &ArrayRef,
1212        needle: ArrayRef,
1213        expected: &[Option<bool>],
1214    ) {
1215        let num_rows = haystack.len();
1216        let list_type = haystack.data_type();
1217        let result = udf
1218            .invoke_with_args(ScalarFunctionArgs {
1219                args: vec![
1220                    ColumnarValue::Array(Arc::clone(haystack)),
1221                    ColumnarValue::Array(needle),
1222                ],
1223                arg_fields: vec![
1224                    Arc::new(Field::new("haystack", list_type.clone(), false)),
1225                    Arc::new(Field::new("needle", list_type.clone(), false)),
1226                ],
1227                number_rows: num_rows,
1228                return_field: Arc::new(Field::new("return", DataType::Boolean, true)),
1229                config_options: Arc::new(ConfigOptions::default()),
1230            })
1231            .unwrap();
1232        let output = result.into_array(num_rows).unwrap();
1233        assert_eq!(output.as_boolean().iter().collect::<Vec<_>>(), expected);
1234    }
1235
1236    #[test]
1237    fn test_sliced_list_offsets() {
1238        // Full rows:
1239        //   row 0: [1, 2]   (not visible after slicing)
1240        //   row 1: [11, 12] (visible row 0)
1241        //   row 2: [21, 22] (visible row 1)
1242        //   row 3: [31, 32] (not visible after slicing)
1243        let field: Arc<Field> = Arc::new(Field::new("item", DataType::Int32, false));
1244        let full_values = Arc::new(Int32Array::from(vec![1, 2, 11, 12, 21, 22, 31, 32]));
1245        let full_offsets = OffsetBuffer::new(vec![0, 2, 4, 6, 8].into());
1246        let full = ListArray::new(Arc::clone(&field), full_offsets, full_values, None);
1247        let sliced_haystack: ArrayRef = Arc::new(full.slice(1, 2));
1248
1249        // array_has_all: needle row 0 = [11], row 1 = [21]
1250        let needle_all: ArrayRef = Arc::new(ListArray::new(
1251            Arc::clone(&field),
1252            OffsetBuffer::new(vec![0, 1, 2].into()),
1253            Arc::new(Int32Array::from(vec![11, 21])),
1254            None,
1255        ));
1256        invoke_and_assert(
1257            &ArrayHasAll::new(),
1258            &sliced_haystack,
1259            needle_all,
1260            &[Some(true), Some(true)],
1261        );
1262
1263        // array_has_any: needle row 0 = [99, 11], row 1 = [99, 21]
1264        let needle_any: ArrayRef = Arc::new(ListArray::new(
1265            field,
1266            OffsetBuffer::new(vec![0, 2, 4].into()),
1267            Arc::new(Int32Array::from(vec![99, 11, 99, 21])),
1268            None,
1269        ));
1270        invoke_and_assert(
1271            &ArrayHasAny::new(),
1272            &sliced_haystack,
1273            needle_any,
1274            &[Some(true), Some(true)],
1275        );
1276    }
1277
1278    #[test]
1279    fn test_sliced_fixed_size_list_offsets() {
1280        // Same logical data as test_sliced_list_offsets, but using FixedSizeListArray.
1281        let field = Arc::new(Field::new("item", DataType::Int32, false));
1282        let full_values = Arc::new(Int32Array::from(vec![1, 2, 11, 12, 21, 22, 31, 32]));
1283        let full = FixedSizeListArray::new(Arc::clone(&field), 2, full_values, None);
1284        let sliced_haystack: ArrayRef = Arc::new(full.slice(1, 2));
1285
1286        // array_has_all: needle row 0 = [11, 12], row 1 = [21, 22]
1287        let needle_all: ArrayRef = Arc::new(FixedSizeListArray::new(
1288            Arc::clone(&field),
1289            2,
1290            Arc::new(Int32Array::from(vec![11, 12, 21, 22])),
1291            None,
1292        ));
1293        invoke_and_assert(
1294            &ArrayHasAll::new(),
1295            &sliced_haystack,
1296            needle_all,
1297            &[Some(true), Some(true)],
1298        );
1299
1300        // array_has_any: needle row 0 = [99, 12], row 1 = [99, 22]
1301        let needle_any: ArrayRef = Arc::new(FixedSizeListArray::new(
1302            field,
1303            2,
1304            Arc::new(Int32Array::from(vec![99, 12, 99, 22])),
1305            None,
1306        ));
1307        invoke_and_assert(
1308            &ArrayHasAny::new(),
1309            &sliced_haystack,
1310            needle_any,
1311            &[Some(true), Some(true)],
1312        );
1313    }
1314}