datafusion_functions_nested/
except.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_except function.
19
20use crate::utils::{check_datatypes, make_scalar_function};
21use arrow::array::{cast::AsArray, Array, ArrayRef, GenericListArray, OffsetSizeTrait};
22use arrow::buffer::OffsetBuffer;
23use arrow::datatypes::{DataType, FieldRef};
24use arrow::row::{RowConverter, SortField};
25use datafusion_common::utils::{take_function_args, ListCoercion};
26use datafusion_common::{internal_err, HashSet, Result};
27use datafusion_expr::{
28    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
29};
30use datafusion_macros::user_doc;
31use std::any::Any;
32use std::sync::Arc;
33
34make_udf_expr_and_func!(
35    ArrayExcept,
36    array_except,
37    first_array second_array,
38    "returns an array of the elements that appear in the first array but not in the second.",
39    array_except_udf
40);
41
42#[user_doc(
43    doc_section(label = "Array Functions"),
44    description = "Returns an array of the elements that appear in the first array but not in the second.",
45    syntax_example = "array_except(array1, array2)",
46    sql_example = r#"```sql
47> select array_except([1, 2, 3, 4], [5, 6, 3, 4]);
48+----------------------------------------------------+
49| array_except([1, 2, 3, 4], [5, 6, 3, 4]);           |
50+----------------------------------------------------+
51| [1, 2]                                              |
52+----------------------------------------------------+
53> select array_except([1, 2, 3, 4], [3, 4, 5, 6]);
54+----------------------------------------------------+
55| array_except([1, 2, 3, 4], [3, 4, 5, 6]);           |
56+----------------------------------------------------+
57| [1, 2]                                              |
58+----------------------------------------------------+
59```"#,
60    argument(
61        name = "array1",
62        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
63    ),
64    argument(
65        name = "array2",
66        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
67    )
68)]
69#[derive(Debug, PartialEq, Eq, Hash)]
70pub struct ArrayExcept {
71    signature: Signature,
72    aliases: Vec<String>,
73}
74
75impl Default for ArrayExcept {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81impl ArrayExcept {
82    pub fn new() -> Self {
83        Self {
84            signature: Signature::arrays(
85                2,
86                Some(ListCoercion::FixedSizedListToList),
87                Volatility::Immutable,
88            ),
89            aliases: vec!["list_except".to_string()],
90        }
91    }
92}
93
94impl ScalarUDFImpl for ArrayExcept {
95    fn as_any(&self) -> &dyn Any {
96        self
97    }
98    fn name(&self) -> &str {
99        "array_except"
100    }
101
102    fn signature(&self) -> &Signature {
103        &self.signature
104    }
105
106    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
107        match (&arg_types[0].clone(), &arg_types[1].clone()) {
108            (DataType::Null, _) | (_, DataType::Null) => Ok(arg_types[0].clone()),
109            (dt, _) => Ok(dt.clone()),
110        }
111    }
112
113    fn invoke_with_args(
114        &self,
115        args: datafusion_expr::ScalarFunctionArgs,
116    ) -> Result<ColumnarValue> {
117        make_scalar_function(array_except_inner)(&args.args)
118    }
119
120    fn aliases(&self) -> &[String] {
121        &self.aliases
122    }
123
124    fn documentation(&self) -> Option<&Documentation> {
125        self.doc()
126    }
127}
128
129/// Array_except SQL function
130pub fn array_except_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
131    let [array1, array2] = take_function_args("array_except", args)?;
132
133    match (array1.data_type(), array2.data_type()) {
134        (DataType::Null, _) | (_, DataType::Null) => Ok(array1.to_owned()),
135        (DataType::List(field), DataType::List(_)) => {
136            check_datatypes("array_except", &[array1, array2])?;
137            let list1 = array1.as_list::<i32>();
138            let list2 = array2.as_list::<i32>();
139            let result = general_except::<i32>(list1, list2, field)?;
140            Ok(Arc::new(result))
141        }
142        (DataType::LargeList(field), DataType::LargeList(_)) => {
143            check_datatypes("array_except", &[array1, array2])?;
144            let list1 = array1.as_list::<i64>();
145            let list2 = array2.as_list::<i64>();
146            let result = general_except::<i64>(list1, list2, field)?;
147            Ok(Arc::new(result))
148        }
149        (dt1, dt2) => {
150            internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}")
151        }
152    }
153}
154
155fn general_except<OffsetSize: OffsetSizeTrait>(
156    l: &GenericListArray<OffsetSize>,
157    r: &GenericListArray<OffsetSize>,
158    field: &FieldRef,
159) -> Result<GenericListArray<OffsetSize>> {
160    let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
161
162    let l_values = l.values().to_owned();
163    let r_values = r.values().to_owned();
164    let l_values = converter.convert_columns(&[l_values])?;
165    let r_values = converter.convert_columns(&[r_values])?;
166
167    let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
168    offsets.push(OffsetSize::usize_as(0));
169
170    let mut rows = Vec::with_capacity(l_values.num_rows());
171    let mut dedup = HashSet::new();
172
173    for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
174        let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
175        let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
176        for i in r_slice {
177            let right_row = r_values.row(i);
178            dedup.insert(right_row);
179        }
180        for i in l_slice {
181            let left_row = l_values.row(i);
182            if dedup.insert(left_row) {
183                rows.push(left_row);
184            }
185        }
186
187        offsets.push(OffsetSize::usize_as(rows.len()));
188        dedup.clear();
189    }
190
191    if let Some(values) = converter.convert_rows(rows)?.first() {
192        Ok(GenericListArray::<OffsetSize>::new(
193            field.to_owned(),
194            OffsetBuffer::new(offsets.into()),
195            values.to_owned(),
196            l.nulls().cloned(),
197        ))
198    } else {
199        internal_err!("array_except failed to convert rows")
200    }
201}