Skip to main content

datafusion_functions_nested/
set_ops.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_union, array_intersect and array_distinct functions.
19
20use crate::utils::make_scalar_function;
21use arrow::array::{
22    Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array,
23    new_empty_array, new_null_array,
24};
25use arrow::buffer::{NullBuffer, OffsetBuffer};
26use arrow::compute::{concat, take};
27use arrow::datatypes::DataType::{LargeList, List, Null};
28use arrow::datatypes::{DataType, Field, FieldRef};
29use arrow::row::{RowConverter, SortField};
30use datafusion_common::cast::{as_large_list_array, as_list_array};
31use datafusion_common::utils::ListCoercion;
32use datafusion_common::{
33    Result, assert_eq_or_internal_err, exec_err, internal_err, utils::take_function_args,
34};
35use datafusion_expr::{
36    ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
37    Volatility,
38};
39use datafusion_macros::user_doc;
40use hashbrown::HashSet;
41use std::fmt::{Display, Formatter};
42use std::sync::Arc;
43
44// Create static instances of ScalarUDFs for each function
45make_udf_expr_and_func!(
46    ArrayUnion,
47    array_union,
48    array1 array2,
49    "returns an array of the elements in the union of array1 and array2 without duplicates.",
50    array_union_udf
51);
52
53make_udf_expr_and_func!(
54    ArrayIntersect,
55    array_intersect,
56    first_array second_array,
57    "returns an array of the elements in the intersection of array1 and array2.",
58    array_intersect_udf
59);
60
61make_udf_expr_and_func!(
62    ArrayDistinct,
63    array_distinct,
64    array,
65    "returns distinct values from the array after removing duplicates.",
66    array_distinct_udf
67);
68
69#[user_doc(
70    doc_section(label = "Array Functions"),
71    description = "Returns an array of elements that are present in both arrays (all elements from both arrays) without duplicates.",
72    syntax_example = "array_union(array1, array2)",
73    sql_example = r#"```sql
74> select array_union([1, 2, 3, 4], [5, 6, 3, 4]);
75+----------------------------------------------------+
76| array_union([1, 2, 3, 4], [5, 6, 3, 4]);           |
77+----------------------------------------------------+
78| [1, 2, 3, 4, 5, 6]                                 |
79+----------------------------------------------------+
80> select array_union([1, 2, 3, 4], [5, 6, 7, 8]);
81+----------------------------------------------------+
82| array_union([1, 2, 3, 4], [5, 6, 7, 8]);           |
83+----------------------------------------------------+
84| [1, 2, 3, 4, 5, 6, 7, 8]                           |
85+----------------------------------------------------+
86```"#,
87    argument(
88        name = "array1",
89        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
90    ),
91    argument(
92        name = "array2",
93        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
94    )
95)]
96#[derive(Debug, PartialEq, Eq, Hash)]
97pub struct ArrayUnion {
98    signature: Signature,
99    aliases: Vec<String>,
100}
101
102impl Default for ArrayUnion {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108impl ArrayUnion {
109    pub fn new() -> Self {
110        Self {
111            signature: Signature::arrays(
112                2,
113                Some(ListCoercion::FixedSizedListToList),
114                Volatility::Immutable,
115            ),
116            aliases: vec![String::from("list_union")],
117        }
118    }
119}
120
121impl ScalarUDFImpl for ArrayUnion {
122    fn name(&self) -> &str {
123        "array_union"
124    }
125
126    fn signature(&self) -> &Signature {
127        &self.signature
128    }
129
130    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
131        let [array1, array2] = take_function_args(self.name(), arg_types)?;
132        match (array1, array2) {
133            (Null, Null) => Ok(DataType::new_list(Null, true)),
134            (Null, dt) | (dt, Null) => Ok(dt.clone()),
135            (dt, _) => Ok(dt.clone()),
136        }
137    }
138
139    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
140        make_scalar_function(array_union_inner)(&args.args)
141    }
142
143    fn aliases(&self) -> &[String] {
144        &self.aliases
145    }
146
147    fn documentation(&self) -> Option<&Documentation> {
148        self.doc()
149    }
150}
151
152#[user_doc(
153    doc_section(label = "Array Functions"),
154    description = "Returns an array of elements in the intersection of array1 and array2.",
155    syntax_example = "array_intersect(array1, array2)",
156    sql_example = r#"```sql
157> select array_intersect([1, 2, 3, 4], [5, 6, 3, 4]);
158+----------------------------------------------------+
159| array_intersect([1, 2, 3, 4], [5, 6, 3, 4]);       |
160+----------------------------------------------------+
161| [3, 4]                                             |
162+----------------------------------------------------+
163> select array_intersect([1, 2, 3, 4], [5, 6, 7, 8]);
164+----------------------------------------------------+
165| array_intersect([1, 2, 3, 4], [5, 6, 7, 8]);       |
166+----------------------------------------------------+
167| []                                                 |
168+----------------------------------------------------+
169```"#,
170    argument(
171        name = "array1",
172        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
173    ),
174    argument(
175        name = "array2",
176        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
177    )
178)]
179#[derive(Debug, PartialEq, Eq, Hash)]
180pub struct ArrayIntersect {
181    signature: Signature,
182    aliases: Vec<String>,
183}
184
185impl Default for ArrayIntersect {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191impl ArrayIntersect {
192    pub fn new() -> Self {
193        Self {
194            signature: Signature::arrays(
195                2,
196                Some(ListCoercion::FixedSizedListToList),
197                Volatility::Immutable,
198            ),
199            aliases: vec![String::from("list_intersect")],
200        }
201    }
202}
203
204impl ScalarUDFImpl for ArrayIntersect {
205    fn name(&self) -> &str {
206        "array_intersect"
207    }
208
209    fn signature(&self) -> &Signature {
210        &self.signature
211    }
212
213    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
214        let [array1, array2] = take_function_args(self.name(), arg_types)?;
215        match (array1, array2) {
216            (Null, Null) => Ok(DataType::new_list(Null, true)),
217            (Null, dt) | (dt, Null) => Ok(dt.clone()),
218            (dt, _) => Ok(dt.clone()),
219        }
220    }
221
222    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
223        make_scalar_function(array_intersect_inner)(&args.args)
224    }
225
226    fn aliases(&self) -> &[String] {
227        &self.aliases
228    }
229
230    fn documentation(&self) -> Option<&Documentation> {
231        self.doc()
232    }
233}
234
235#[user_doc(
236    doc_section(label = "Array Functions"),
237    description = "Returns distinct values from the array after removing duplicates.",
238    syntax_example = "array_distinct(array)",
239    sql_example = r#"```sql
240> select array_distinct([1, 3, 2, 3, 1, 2, 4]);
241+---------------------------------+
242| array_distinct(List([1,2,3,4])) |
243+---------------------------------+
244| [1, 2, 3, 4]                    |
245+---------------------------------+
246```"#,
247    argument(
248        name = "array",
249        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
250    )
251)]
252#[derive(Debug, PartialEq, Eq, Hash)]
253pub struct ArrayDistinct {
254    signature: Signature,
255    aliases: Vec<String>,
256}
257
258impl ArrayDistinct {
259    pub fn new() -> Self {
260        Self {
261            signature: Signature::array(Volatility::Immutable),
262            aliases: vec!["list_distinct".to_string()],
263        }
264    }
265}
266
267impl Default for ArrayDistinct {
268    fn default() -> Self {
269        Self::new()
270    }
271}
272
273impl ScalarUDFImpl for ArrayDistinct {
274    fn name(&self) -> &str {
275        "array_distinct"
276    }
277
278    fn signature(&self) -> &Signature {
279        &self.signature
280    }
281
282    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
283        Ok(arg_types[0].clone())
284    }
285
286    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
287        make_scalar_function(array_distinct_inner)(&args.args)
288    }
289
290    fn aliases(&self) -> &[String] {
291        &self.aliases
292    }
293
294    fn documentation(&self) -> Option<&Documentation> {
295        self.doc()
296    }
297}
298
299/// array_distinct SQL function
300/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
301fn array_distinct_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
302    let [array] = take_function_args("array_distinct", args)?;
303    match array.data_type() {
304        Null => Ok(Arc::clone(array)),
305        List(field) => {
306            let array = as_list_array(&array)?;
307            general_array_distinct(array, field)
308        }
309        LargeList(field) => {
310            let array = as_large_list_array(&array)?;
311            general_array_distinct(array, field)
312        }
313        arg_type => exec_err!("array_distinct does not support type {arg_type}"),
314    }
315}
316
317#[derive(Debug, PartialEq, Copy, Clone)]
318enum SetOp {
319    Union,
320    Intersect,
321}
322
323impl Display for SetOp {
324    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
325        match self {
326            SetOp::Union => write!(f, "array_union"),
327            SetOp::Intersect => write!(f, "array_intersect"),
328        }
329    }
330}
331
332fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
333    l: &GenericListArray<OffsetSize>,
334    r: &GenericListArray<OffsetSize>,
335    field: Arc<Field>,
336    set_op: SetOp,
337) -> Result<ArrayRef> {
338    if l.is_empty() || l.value_type().is_null() {
339        let field = Arc::new(Field::new_list_field(r.value_type(), true));
340        return general_array_distinct::<OffsetSize>(r, &field);
341    } else if r.is_empty() || r.value_type().is_null() {
342        let field = Arc::new(Field::new_list_field(l.value_type(), true));
343        return general_array_distinct::<OffsetSize>(l, &field);
344    }
345
346    assert_eq_or_internal_err!(
347        l.value_type(),
348        r.value_type(),
349        "{set_op:?} is not implemented for '{l:?}' and '{r:?}'"
350    );
351
352    let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
353
354    // Only convert the visible portion of the values array. For sliced
355    // ListArrays, values() returns the full underlying array but only
356    // elements between the first and last offset are referenced.
357    let l_first = l.offsets()[0].as_usize();
358    let l_len = l.offsets()[l.len()].as_usize() - l_first;
359    let rows_l = converter.convert_columns(&[l.values().slice(l_first, l_len)])?;
360
361    let r_first = r.offsets()[0].as_usize();
362    let r_len = r.offsets()[r.len()].as_usize() - r_first;
363    let rows_r = converter.convert_columns(&[r.values().slice(r_first, r_len)])?;
364
365    // Combine the *sliced* value arrays so 0-based indices from the row
366    // converter map directly into the concatenated array.
367    let l_values = l.values().slice(l_first, l_len);
368    let r_values = r.values().slice(r_first, r_len);
369    let combined_values = concat(&[l_values.as_ref(), r_values.as_ref()])?;
370    let r_offset = l_len;
371
372    match set_op {
373        SetOp::Union => generic_set_loop::<OffsetSize, true>(
374            l,
375            r,
376            &rows_l,
377            &rows_r,
378            field,
379            &combined_values,
380            r_offset,
381        ),
382        SetOp::Intersect => generic_set_loop::<OffsetSize, false>(
383            l,
384            r,
385            &rows_l,
386            &rows_r,
387            field,
388            &combined_values,
389            r_offset,
390        ),
391    }
392}
393
394/// Inner loop for set operations, parameterized by const generic to
395/// avoid branching inside the hot loop.
396fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
397    l: &GenericListArray<OffsetSize>,
398    r: &GenericListArray<OffsetSize>,
399    rows_l: &arrow::row::Rows,
400    rows_r: &arrow::row::Rows,
401    field: Arc<Field>,
402    combined_values: &ArrayRef,
403    r_offset: usize,
404) -> Result<ArrayRef> {
405    let l_offsets = l.value_offsets();
406    let r_offsets = r.value_offsets();
407    let l_first = l.offsets()[0].as_usize();
408    let r_first = r.offsets()[0].as_usize();
409
410    let mut result_offsets = Vec::with_capacity(l.len() + 1);
411    result_offsets.push(OffsetSize::usize_as(0));
412    let initial_capacity = if IS_UNION {
413        // Union can include all elements from both sides
414        rows_l.num_rows()
415    } else {
416        // Intersect result is bounded by the smaller side
417        rows_l.num_rows().min(rows_r.num_rows())
418    };
419
420    let mut indices: Vec<usize> = Vec::with_capacity(initial_capacity);
421
422    // Reuse hash sets across iterations
423    let mut seen = HashSet::new();
424    let mut lookup_set = HashSet::new();
425    for i in 0..l.len() {
426        let last_offset = *result_offsets.last().unwrap();
427
428        if l.is_null(i) || r.is_null(i) {
429            result_offsets.push(last_offset);
430            continue;
431        }
432
433        let l_start = l_offsets[i].as_usize() - l_first;
434        let l_end = l_offsets[i + 1].as_usize() - l_first;
435        let r_start = r_offsets[i].as_usize() - r_first;
436        let r_end = r_offsets[i + 1].as_usize() - r_first;
437
438        seen.clear();
439
440        if IS_UNION {
441            for idx in l_start..l_end {
442                let row = rows_l.row(idx);
443                if seen.insert(row) {
444                    indices.push(idx);
445                }
446            }
447            for idx in r_start..r_end {
448                let row = rows_r.row(idx);
449                if seen.insert(row) {
450                    indices.push(idx + r_offset);
451                }
452            }
453        } else {
454            let l_len = l_end - l_start;
455            let r_len = r_end - r_start;
456
457            // Select shorter side for lookup, longer side for probing.
458            // Track the probe side's offset into the combined values array.
459            let (lookup_rows, lookup_range, probe_rows, probe_range, probe_offset) =
460                if l_len < r_len {
461                    (rows_l, l_start..l_end, rows_r, r_start..r_end, r_offset)
462                } else {
463                    (rows_r, r_start..r_end, rows_l, l_start..l_end, 0)
464                };
465            lookup_set.clear();
466            lookup_set.reserve(lookup_range.len());
467
468            // Build lookup table
469            for idx in lookup_range {
470                lookup_set.insert(lookup_rows.row(idx));
471            }
472
473            // Probe and emit distinct intersected rows
474            for idx in probe_range {
475                let row = probe_rows.row(idx);
476                if lookup_set.contains(&row) && seen.insert(row) {
477                    indices.push(idx + probe_offset);
478                }
479            }
480        }
481        result_offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
482    }
483
484    // Gather distinct values by index from the combined values array.
485    // Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
486    let final_values = if indices.is_empty() {
487        new_empty_array(&l.value_type())
488    } else if OffsetSize::IS_LARGE {
489        let indices =
490            UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
491        take(combined_values.as_ref(), &indices, None)?
492    } else {
493        let indices =
494            UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
495        take(combined_values.as_ref(), &indices, None)?
496    };
497
498    let arr = GenericListArray::<OffsetSize>::try_new(
499        field,
500        OffsetBuffer::new(result_offsets.into()),
501        final_values,
502        NullBuffer::union(l.nulls(), r.nulls()),
503    )?;
504    Ok(Arc::new(arr))
505}
506
507fn general_set_op(
508    array1: &ArrayRef,
509    array2: &ArrayRef,
510    set_op: SetOp,
511) -> Result<ArrayRef> {
512    let len = array1.len();
513    match (array1.data_type(), array2.data_type()) {
514        (Null, Null) => Ok(new_null_array(&DataType::new_list(Null, true), len)),
515        (Null, dt @ List(_))
516        | (Null, dt @ LargeList(_))
517        | (dt @ List(_), Null)
518        | (dt @ LargeList(_), Null) => Ok(new_null_array(dt, len)),
519        (List(field), List(_)) => {
520            let array1 = as_list_array(&array1)?;
521            let array2 = as_list_array(&array2)?;
522            generic_set_lists::<i32>(array1, array2, Arc::clone(field), set_op)
523        }
524        (LargeList(field), LargeList(_)) => {
525            let array1 = as_large_list_array(&array1)?;
526            let array2 = as_large_list_array(&array2)?;
527            generic_set_lists::<i64>(array1, array2, Arc::clone(field), set_op)
528        }
529        (data_type1, data_type2) => {
530            internal_err!(
531                "{set_op} does not support types '{data_type1:?}' and '{data_type2:?}'"
532            )
533        }
534    }
535}
536
537fn array_union_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
538    let [array1, array2] = take_function_args("array_union", args)?;
539    general_set_op(array1, array2, SetOp::Union)
540}
541
542fn array_intersect_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
543    let [array1, array2] = take_function_args("array_intersect", args)?;
544    general_set_op(array1, array2, SetOp::Intersect)
545}
546
547fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
548    array: &GenericListArray<OffsetSize>,
549    field: &FieldRef,
550) -> Result<ArrayRef> {
551    if array.is_empty() {
552        return Ok(Arc::new(array.clone()) as ArrayRef);
553    }
554    let value_offsets = array.value_offsets();
555    let dt = array.value_type();
556    let mut offsets = Vec::with_capacity(array.len() + 1);
557    offsets.push(OffsetSize::usize_as(0));
558
559    let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
560
561    // Only convert the visible portion of the values array. For sliced
562    // ListArrays, values() returns the full underlying array but only
563    // elements between the first and last offset are referenced.
564    let first_offset = value_offsets[0].as_usize();
565    let visible_len = value_offsets[array.len()].as_usize() - first_offset;
566    let rows =
567        converter.convert_columns(&[array.values().slice(first_offset, visible_len)])?;
568
569    let mut indices: Vec<usize> = Vec::with_capacity(rows.num_rows());
570    let mut seen = HashSet::new();
571    for i in 0..array.len() {
572        let last_offset = *offsets.last().unwrap();
573
574        // Null list entries produce no output; just carry forward the offset.
575        if array.is_null(i) {
576            offsets.push(last_offset);
577            continue;
578        }
579
580        let start = value_offsets[i].as_usize() - first_offset;
581        let end = value_offsets[i + 1].as_usize() - first_offset;
582        seen.clear();
583        seen.reserve(end - start);
584
585        // Walk the sub-array and keep only the first occurrence of each value.
586        for idx in start..end {
587            let row = rows.row(idx);
588            if seen.insert(row) {
589                indices.push(idx + first_offset);
590            }
591        }
592        offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
593    }
594
595    // Gather distinct values in a single pass, using the computed `indices`.
596    // Indices are absolute positions in array.values() (first_offset was added
597    // back when collecting them), so we can take directly from the full values.
598    // Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
599    let final_values = if indices.is_empty() {
600        new_empty_array(&dt)
601    } else if OffsetSize::IS_LARGE {
602        let indices =
603            UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
604        take(array.values().as_ref(), &indices, None)?
605    } else {
606        let indices =
607            UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
608        take(array.values().as_ref(), &indices, None)?
609    };
610
611    Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
612        Arc::clone(field),
613        OffsetBuffer::new(offsets.into()),
614        final_values,
615        // Keep the list nulls
616        array.nulls().cloned(),
617    )?))
618}
619
620#[cfg(test)]
621mod tests {
622    use std::sync::Arc;
623
624    use arrow::{
625        array::{Array, AsArray, Int32Array, ListArray},
626        buffer::OffsetBuffer,
627        datatypes::{DataType, Field, Int32Type},
628    };
629    use datafusion_common::{DataFusionError, Result, config::ConfigOptions};
630    use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
631
632    use crate::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion, array_distinct_udf};
633
634    /// Build two sliced ListArrays and return them along with the shared list
635    /// field.
636    ///
637    /// l: [[1,2], [3,4], [5,6], [7,8]]  →  slice(1,2)  →  [[3,4], [5,6]]
638    /// r: [[1,3], [3,5], [5,7], [7,1]]  →  slice(1,2)  →  [[3,5], [5,7]]
639    fn make_sliced_pair() -> (ListArray, ListArray, Arc<Field>) {
640        let l = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
641            Some(vec![Some(1), Some(2)]),
642            Some(vec![Some(3), Some(4)]),
643            Some(vec![Some(5), Some(6)]),
644            Some(vec![Some(7), Some(8)]),
645        ]);
646        let r = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
647            Some(vec![Some(1), Some(3)]),
648            Some(vec![Some(3), Some(5)]),
649            Some(vec![Some(5), Some(7)]),
650            Some(vec![Some(7), Some(1)]),
651        ]);
652        let field = Arc::new(Field::new("item", l.data_type().clone(), true));
653        (l.slice(1, 2), r.slice(1, 2), field)
654    }
655
656    fn collect_i32_list(list: &ListArray) -> Vec<Vec<i32>> {
657        (0..list.len())
658            .map(|i| {
659                let arr = list.value(i);
660                arr.as_any()
661                    .downcast_ref::<Int32Array>()
662                    .unwrap()
663                    .values()
664                    .to_vec()
665            })
666            .collect()
667    }
668
669    #[test]
670    fn test_array_union_sliced_lists() -> Result<()> {
671        let (l, r, field) = make_sliced_pair();
672
673        let result = ArrayUnion::new().invoke_with_args(ScalarFunctionArgs {
674            args: vec![
675                ColumnarValue::Array(Arc::new(l)),
676                ColumnarValue::Array(Arc::new(r)),
677            ],
678            arg_fields: vec![Arc::clone(&field), Arc::clone(&field)],
679            number_rows: 2,
680            return_field: Arc::clone(&field),
681            config_options: Arc::new(ConfigOptions::default()),
682        })?;
683
684        let output = result.into_array(2)?;
685        let output = output.as_list::<i32>();
686        let rows = collect_i32_list(output);
687
688        // Row 0: union([3,4], [3,5]) = [3,4,5]
689        assert_eq!(rows[0], vec![3, 4, 5]);
690        // Row 1: union([5,6], [5,7]) = [5,6,7]
691        assert_eq!(rows[1], vec![5, 6, 7]);
692        Ok(())
693    }
694
695    #[test]
696    fn test_array_intersect_sliced_lists() -> Result<()> {
697        let (l, r, field) = make_sliced_pair();
698
699        let result = ArrayIntersect::new().invoke_with_args(ScalarFunctionArgs {
700            args: vec![
701                ColumnarValue::Array(Arc::new(l)),
702                ColumnarValue::Array(Arc::new(r)),
703            ],
704            arg_fields: vec![Arc::clone(&field), Arc::clone(&field)],
705            number_rows: 2,
706            return_field: Arc::clone(&field),
707            config_options: Arc::new(ConfigOptions::default()),
708        })?;
709
710        let output = result.into_array(2)?;
711        let output = output.as_list::<i32>();
712        let rows = collect_i32_list(output);
713
714        // Row 0: intersect([3,4], [3,5]) = [3]
715        assert_eq!(rows[0], vec![3]);
716        // Row 1: intersect([5,6], [5,7]) = [5]
717        assert_eq!(rows[1], vec![5]);
718        Ok(())
719    }
720
721    #[test]
722    fn test_array_distinct_sliced_list() -> Result<()> {
723        // [[1,1], [3,3,4], [5,5,6], [7,7]]  →  slice(1,2)  →  [[3,3,4], [5,5,6]]
724        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
725            Some(vec![Some(1), Some(1)]),
726            Some(vec![Some(3), Some(3), Some(4)]),
727            Some(vec![Some(5), Some(5), Some(6)]),
728            Some(vec![Some(7), Some(7)]),
729        ]);
730        let sliced = list.slice(1, 2);
731        let field = Arc::new(Field::new("item", sliced.data_type().clone(), true));
732
733        let result = ArrayDistinct::new().invoke_with_args(ScalarFunctionArgs {
734            args: vec![ColumnarValue::Array(Arc::new(sliced))],
735            arg_fields: vec![Arc::clone(&field)],
736            number_rows: 2,
737            return_field: field,
738            config_options: Arc::new(ConfigOptions::default()),
739        })?;
740
741        let output = result.into_array(2)?;
742        let output = output.as_list::<i32>();
743        let rows = collect_i32_list(output);
744
745        // Row 0: distinct([3,3,4]) = [3,4]
746        assert_eq!(rows[0], vec![3, 4]);
747        // Row 1: distinct([5,5,6]) = [5,6]
748        assert_eq!(rows[1], vec![5, 6]);
749        Ok(())
750    }
751
752    #[test]
753    fn test_array_distinct_inner_nullability_result_type_match_return_type()
754    -> Result<(), DataFusionError> {
755        let udf = array_distinct_udf();
756
757        for inner_nullable in [true, false] {
758            let inner_field = Field::new_list_field(DataType::Int32, inner_nullable);
759            let input_field =
760                Field::new_list("input", Arc::new(inner_field.clone()), true);
761
762            // [[1, 1, 2]]
763            let input_array = ListArray::new(
764                inner_field.into(),
765                OffsetBuffer::new(vec![0, 3].into()),
766                Arc::new(Int32Array::new(vec![1, 1, 2].into(), None)),
767                None,
768            );
769
770            let input_array = ColumnarValue::Array(Arc::new(input_array));
771
772            let result = udf.invoke_with_args(ScalarFunctionArgs {
773                args: vec![input_array],
774                arg_fields: vec![input_field.clone().into()],
775                number_rows: 1,
776                return_field: input_field.clone().into(),
777                config_options: Arc::new(ConfigOptions::default()),
778            })?;
779
780            assert_eq!(
781                result.data_type(),
782                udf.return_type(&[input_field.data_type().clone()])?
783            );
784        }
785        Ok(())
786    }
787}