datafusion_functions_nested/
except.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for array_except function.
19
20use crate::utils::{check_datatypes, make_scalar_function};
21use arrow::array::{cast::AsArray, Array, ArrayRef, GenericListArray, OffsetSizeTrait};
22use arrow::buffer::OffsetBuffer;
23use arrow::datatypes::{DataType, FieldRef};
24use arrow::row::{RowConverter, SortField};
25use datafusion_common::utils::take_function_args;
26use datafusion_common::{internal_err, HashSet, Result};
27use datafusion_expr::{
28    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
29};
30use datafusion_macros::user_doc;
31use std::any::Any;
32use std::sync::Arc;
33
34make_udf_expr_and_func!(
35    ArrayExcept,
36    array_except,
37    first_array second_array,
38    "returns an array of the elements that appear in the first array but not in the second.",
39    array_except_udf
40);
41
42#[user_doc(
43    doc_section(label = "Array Functions"),
44    description = "Returns an array of the elements that appear in the first array but not in the second.",
45    syntax_example = "array_except(array1, array2)",
46    sql_example = r#"```sql
47> select array_except([1, 2, 3, 4], [5, 6, 3, 4]);
48+----------------------------------------------------+
49| array_except([1, 2, 3, 4], [5, 6, 3, 4]);           |
50+----------------------------------------------------+
51| [1, 2]                                              |
52+----------------------------------------------------+
53> select array_except([1, 2, 3, 4], [3, 4, 5, 6]);
54+----------------------------------------------------+
55| array_except([1, 2, 3, 4], [3, 4, 5, 6]);           |
56+----------------------------------------------------+
57| [1, 2]                                              |
58+----------------------------------------------------+
59```"#,
60    argument(
61        name = "array1",
62        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
63    ),
64    argument(
65        name = "array2",
66        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
67    )
68)]
69#[derive(Debug)]
70pub struct ArrayExcept {
71    signature: Signature,
72    aliases: Vec<String>,
73}
74
75impl Default for ArrayExcept {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81impl ArrayExcept {
82    pub fn new() -> Self {
83        Self {
84            signature: Signature::any(2, Volatility::Immutable),
85            aliases: vec!["list_except".to_string()],
86        }
87    }
88}
89
90impl ScalarUDFImpl for ArrayExcept {
91    fn as_any(&self) -> &dyn Any {
92        self
93    }
94    fn name(&self) -> &str {
95        "array_except"
96    }
97
98    fn signature(&self) -> &Signature {
99        &self.signature
100    }
101
102    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
103        match (&arg_types[0].clone(), &arg_types[1].clone()) {
104            (DataType::Null, _) | (_, DataType::Null) => Ok(arg_types[0].clone()),
105            (dt, _) => Ok(dt.clone()),
106        }
107    }
108
109    fn invoke_with_args(
110        &self,
111        args: datafusion_expr::ScalarFunctionArgs,
112    ) -> Result<ColumnarValue> {
113        make_scalar_function(array_except_inner)(&args.args)
114    }
115
116    fn aliases(&self) -> &[String] {
117        &self.aliases
118    }
119
120    fn documentation(&self) -> Option<&Documentation> {
121        self.doc()
122    }
123}
124
125/// Array_except SQL function
126pub fn array_except_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
127    let [array1, array2] = take_function_args("array_except", args)?;
128
129    match (array1.data_type(), array2.data_type()) {
130        (DataType::Null, _) | (_, DataType::Null) => Ok(array1.to_owned()),
131        (DataType::List(field), DataType::List(_)) => {
132            check_datatypes("array_except", &[array1, array2])?;
133            let list1 = array1.as_list::<i32>();
134            let list2 = array2.as_list::<i32>();
135            let result = general_except::<i32>(list1, list2, field)?;
136            Ok(Arc::new(result))
137        }
138        (DataType::LargeList(field), DataType::LargeList(_)) => {
139            check_datatypes("array_except", &[array1, array2])?;
140            let list1 = array1.as_list::<i64>();
141            let list2 = array2.as_list::<i64>();
142            let result = general_except::<i64>(list1, list2, field)?;
143            Ok(Arc::new(result))
144        }
145        (dt1, dt2) => {
146            internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}")
147        }
148    }
149}
150
151fn general_except<OffsetSize: OffsetSizeTrait>(
152    l: &GenericListArray<OffsetSize>,
153    r: &GenericListArray<OffsetSize>,
154    field: &FieldRef,
155) -> Result<GenericListArray<OffsetSize>> {
156    let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
157
158    let l_values = l.values().to_owned();
159    let r_values = r.values().to_owned();
160    let l_values = converter.convert_columns(&[l_values])?;
161    let r_values = converter.convert_columns(&[r_values])?;
162
163    let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
164    offsets.push(OffsetSize::usize_as(0));
165
166    let mut rows = Vec::with_capacity(l_values.num_rows());
167    let mut dedup = HashSet::new();
168
169    for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
170        let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
171        let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
172        for i in r_slice {
173            let right_row = r_values.row(i);
174            dedup.insert(right_row);
175        }
176        for i in l_slice {
177            let left_row = l_values.row(i);
178            if dedup.insert(left_row) {
179                rows.push(left_row);
180            }
181        }
182
183        offsets.push(OffsetSize::usize_as(rows.len()));
184        dedup.clear();
185    }
186
187    if let Some(values) = converter.convert_rows(rows)?.first() {
188        Ok(GenericListArray::<OffsetSize>::new(
189            field.to_owned(),
190            OffsetBuffer::new(offsets.into()),
191            values.to_owned(),
192            l.nulls().cloned(),
193        ))
194    } else {
195        internal_err!("array_except failed to convert rows")
196    }
197}