Skip to main content

datafusion_functions_nested/
except.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definition for array_except function.
19
20use crate::utils::{check_datatypes, make_scalar_function};
21use arrow::array::new_null_array;
22use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, cast::AsArray};
23use arrow::buffer::{NullBuffer, OffsetBuffer};
24use arrow::datatypes::{DataType, FieldRef};
25use arrow::row::{RowConverter, SortField};
26use datafusion_common::utils::{ListCoercion, take_function_args};
27use datafusion_common::{HashSet, Result, internal_err};
28use datafusion_expr::{
29    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_macros::user_doc;
32use itertools::Itertools;
33use std::any::Any;
34use std::sync::Arc;
35
36make_udf_expr_and_func!(
37    ArrayExcept,
38    array_except,
39    first_array second_array,
40    "returns an array of the elements that appear in the first array but not in the second.",
41    array_except_udf
42);
43
44#[user_doc(
45    doc_section(label = "Array Functions"),
46    description = "Returns an array of the elements that appear in the first array but not in the second.",
47    syntax_example = "array_except(array1, array2)",
48    sql_example = r#"```sql
49> select array_except([1, 2, 3, 4], [5, 6, 3, 4]);
50+----------------------------------------------------+
51| array_except([1, 2, 3, 4], [5, 6, 3, 4]);           |
52+----------------------------------------------------+
53| [1, 2]                                              |
54+----------------------------------------------------+
55> select array_except([1, 2, 3, 4], [3, 4, 5, 6]);
56+----------------------------------------------------+
57| array_except([1, 2, 3, 4], [3, 4, 5, 6]);           |
58+----------------------------------------------------+
59| [1, 2]                                              |
60+----------------------------------------------------+
61```"#,
62    argument(
63        name = "array1",
64        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
65    ),
66    argument(
67        name = "array2",
68        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
69    )
70)]
71#[derive(Debug, PartialEq, Eq, Hash)]
72pub struct ArrayExcept {
73    signature: Signature,
74    aliases: Vec<String>,
75}
76
77impl Default for ArrayExcept {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83impl ArrayExcept {
84    pub fn new() -> Self {
85        Self {
86            signature: Signature::arrays(
87                2,
88                Some(ListCoercion::FixedSizedListToList),
89                Volatility::Immutable,
90            ),
91            aliases: vec!["list_except".to_string()],
92        }
93    }
94}
95
96impl ScalarUDFImpl for ArrayExcept {
97    fn as_any(&self) -> &dyn Any {
98        self
99    }
100    fn name(&self) -> &str {
101        "array_except"
102    }
103
104    fn signature(&self) -> &Signature {
105        &self.signature
106    }
107
108    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
109        match (&arg_types[0], &arg_types[1]) {
110            (DataType::Null, DataType::Null) => {
111                Ok(DataType::new_list(DataType::Null, true))
112            }
113            (DataType::Null, dt) | (dt, DataType::Null) => Ok(dt.clone()),
114            (dt, _) => Ok(dt.clone()),
115        }
116    }
117
118    fn invoke_with_args(
119        &self,
120        args: datafusion_expr::ScalarFunctionArgs,
121    ) -> Result<ColumnarValue> {
122        make_scalar_function(array_except_inner)(&args.args)
123    }
124
125    fn aliases(&self) -> &[String] {
126        &self.aliases
127    }
128
129    fn documentation(&self) -> Option<&Documentation> {
130        self.doc()
131    }
132}
133
134fn array_except_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
135    let [array1, array2] = take_function_args("array_except", args)?;
136
137    let len = array1.len();
138    match (array1.data_type(), array2.data_type()) {
139        (DataType::Null, DataType::Null) => Ok(new_null_array(
140            &DataType::new_list(DataType::Null, true),
141            len,
142        )),
143        (DataType::Null, dt @ DataType::List(_))
144        | (DataType::Null, dt @ DataType::LargeList(_))
145        | (dt @ DataType::List(_), DataType::Null)
146        | (dt @ DataType::LargeList(_), DataType::Null) => Ok(new_null_array(dt, len)),
147        (DataType::List(field), DataType::List(_)) => {
148            check_datatypes("array_except", &[array1, array2])?;
149            let list1 = array1.as_list::<i32>();
150            let list2 = array2.as_list::<i32>();
151            let result = general_except::<i32>(list1, list2, field)?;
152            Ok(Arc::new(result))
153        }
154        (DataType::LargeList(field), DataType::LargeList(_)) => {
155            check_datatypes("array_except", &[array1, array2])?;
156            let list1 = array1.as_list::<i64>();
157            let list2 = array2.as_list::<i64>();
158            let result = general_except::<i64>(list1, list2, field)?;
159            Ok(Arc::new(result))
160        }
161        (dt1, dt2) => {
162            internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}")
163        }
164    }
165}
166
167fn general_except<OffsetSize: OffsetSizeTrait>(
168    l: &GenericListArray<OffsetSize>,
169    r: &GenericListArray<OffsetSize>,
170    field: &FieldRef,
171) -> Result<GenericListArray<OffsetSize>> {
172    let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
173
174    let l_values = l.values().to_owned();
175    let r_values = r.values().to_owned();
176    let l_values = converter.convert_columns(&[l_values])?;
177    let r_values = converter.convert_columns(&[r_values])?;
178
179    let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
180    offsets.push(OffsetSize::usize_as(0));
181
182    let mut rows = Vec::with_capacity(l_values.num_rows());
183    let mut dedup = HashSet::new();
184
185    let nulls = NullBuffer::union(l.nulls(), r.nulls());
186
187    let l_offsets_iter = l.offsets().iter().tuple_windows();
188    let r_offsets_iter = r.offsets().iter().tuple_windows();
189    for (list_index, ((l_start, l_end), (r_start, r_end))) in
190        l_offsets_iter.zip(r_offsets_iter).enumerate()
191    {
192        if nulls
193            .as_ref()
194            .is_some_and(|nulls| nulls.is_null(list_index))
195        {
196            offsets.push(OffsetSize::usize_as(rows.len()));
197            continue;
198        }
199
200        for element_index in r_start.as_usize()..r_end.as_usize() {
201            let right_row = r_values.row(element_index);
202            dedup.insert(right_row);
203        }
204        for element_index in l_start.as_usize()..l_end.as_usize() {
205            let left_row = l_values.row(element_index);
206            if dedup.insert(left_row) {
207                rows.push(left_row);
208            }
209        }
210
211        offsets.push(OffsetSize::usize_as(rows.len()));
212        dedup.clear();
213    }
214
215    if let Some(values) = converter.convert_rows(rows)?.first() {
216        Ok(GenericListArray::<OffsetSize>::new(
217            field.to_owned(),
218            OffsetBuffer::new(offsets.into()),
219            values.to_owned(),
220            nulls,
221        ))
222    } else {
223        internal_err!("array_except failed to convert rows")
224    }
225}