Skip to main content

datafusion_functions_nested/
except.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definition for array_except function.
19
20use crate::utils::{check_datatypes, make_scalar_function};
21use arrow::array::new_null_array;
22use arrow::array::{
23    Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array,
24    cast::AsArray,
25};
26use arrow::buffer::{NullBuffer, OffsetBuffer};
27use arrow::compute::take;
28use arrow::datatypes::{DataType, FieldRef};
29use arrow::row::{RowConverter, SortField};
30use datafusion_common::utils::{ListCoercion, take_function_args};
31use datafusion_common::{HashSet, Result, internal_err};
32use datafusion_expr::{
33    ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
34    Volatility,
35};
36use datafusion_macros::user_doc;
37use itertools::Itertools;
38use std::sync::Arc;
39
40make_udf_expr_and_func!(
41    ArrayExcept,
42    array_except,
43    first_array second_array,
44    "returns an array of the elements that appear in the first array but not in the second.",
45    array_except_udf
46);
47
48#[user_doc(
49    doc_section(label = "Array Functions"),
50    description = "Returns an array of the elements that appear in the first array but not in the second.",
51    syntax_example = "array_except(array1, array2)",
52    sql_example = r#"```sql
53> select array_except([1, 2, 3, 4], [5, 6, 3, 4]);
54+----------------------------------------------------+
55| array_except([1, 2, 3, 4], [5, 6, 3, 4]);           |
56+----------------------------------------------------+
57| [1, 2]                                              |
58+----------------------------------------------------+
59> select array_except([1, 2, 3, 4], [3, 4, 5, 6]);
60+----------------------------------------------------+
61| array_except([1, 2, 3, 4], [3, 4, 5, 6]);           |
62+----------------------------------------------------+
63| [1, 2]                                              |
64+----------------------------------------------------+
65```"#,
66    argument(
67        name = "array1",
68        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
69    ),
70    argument(
71        name = "array2",
72        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
73    )
74)]
75#[derive(Debug, PartialEq, Eq, Hash)]
76pub struct ArrayExcept {
77    signature: Signature,
78    aliases: Vec<String>,
79}
80
81impl Default for ArrayExcept {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl ArrayExcept {
88    pub fn new() -> Self {
89        Self {
90            signature: Signature::arrays(
91                2,
92                Some(ListCoercion::FixedSizedListToList),
93                Volatility::Immutable,
94            ),
95            aliases: vec!["list_except".to_string()],
96        }
97    }
98}
99
100impl ScalarUDFImpl for ArrayExcept {
101    fn name(&self) -> &str {
102        "array_except"
103    }
104
105    fn signature(&self) -> &Signature {
106        &self.signature
107    }
108
109    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
110        match (&arg_types[0], &arg_types[1]) {
111            (DataType::Null, DataType::Null) => {
112                Ok(DataType::new_list(DataType::Null, true))
113            }
114            (DataType::Null, dt) | (dt, DataType::Null) => Ok(dt.clone()),
115            (dt, _) => Ok(dt.clone()),
116        }
117    }
118
119    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
120        make_scalar_function(array_except_inner)(&args.args)
121    }
122
123    fn aliases(&self) -> &[String] {
124        &self.aliases
125    }
126
127    fn documentation(&self) -> Option<&Documentation> {
128        self.doc()
129    }
130}
131
132fn array_except_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
133    let [array1, array2] = take_function_args("array_except", args)?;
134
135    let len = array1.len();
136    match (array1.data_type(), array2.data_type()) {
137        (DataType::Null, DataType::Null) => Ok(new_null_array(
138            &DataType::new_list(DataType::Null, true),
139            len,
140        )),
141        (DataType::Null, dt @ DataType::List(_))
142        | (DataType::Null, dt @ DataType::LargeList(_))
143        | (dt @ DataType::List(_), DataType::Null)
144        | (dt @ DataType::LargeList(_), DataType::Null) => Ok(new_null_array(dt, len)),
145        (DataType::List(field), DataType::List(_)) => {
146            check_datatypes("array_except", &[array1, array2])?;
147            let list1 = array1.as_list::<i32>();
148            let list2 = array2.as_list::<i32>();
149            let result = general_except::<i32>(list1, list2, field)?;
150            Ok(Arc::new(result))
151        }
152        (DataType::LargeList(field), DataType::LargeList(_)) => {
153            check_datatypes("array_except", &[array1, array2])?;
154            let list1 = array1.as_list::<i64>();
155            let list2 = array2.as_list::<i64>();
156            let result = general_except::<i64>(list1, list2, field)?;
157            Ok(Arc::new(result))
158        }
159        (dt1, dt2) => {
160            internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}")
161        }
162    }
163}
164
165fn general_except<OffsetSize: OffsetSizeTrait>(
166    l: &GenericListArray<OffsetSize>,
167    r: &GenericListArray<OffsetSize>,
168    field: &FieldRef,
169) -> Result<GenericListArray<OffsetSize>> {
170    let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
171
172    // Only convert the visible portion of the values array. For sliced
173    // ListArrays, values() returns the full underlying array but only
174    // elements between the first and last offset are referenced.
175    let l_first = l.offsets()[0].as_usize();
176    let l_len = l.offsets()[l.len()].as_usize() - l_first;
177    let l_values = converter.convert_columns(&[l.values().slice(l_first, l_len)])?;
178
179    let r_first = r.offsets()[0].as_usize();
180    let r_len = r.offsets()[r.len()].as_usize() - r_first;
181    let r_values = converter.convert_columns(&[r.values().slice(r_first, r_len)])?;
182
183    let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
184    offsets.push(OffsetSize::usize_as(0));
185
186    let mut indices: Vec<usize> = Vec::with_capacity(l_values.num_rows());
187    let mut dedup = HashSet::new();
188
189    let nulls = NullBuffer::union(l.nulls(), r.nulls());
190
191    let l_offsets_iter = l.offsets().iter().tuple_windows();
192    let r_offsets_iter = r.offsets().iter().tuple_windows();
193    for (list_index, ((l_start, l_end), (r_start, r_end))) in
194        l_offsets_iter.zip(r_offsets_iter).enumerate()
195    {
196        if nulls
197            .as_ref()
198            .is_some_and(|nulls| nulls.is_null(list_index))
199        {
200            offsets.push(OffsetSize::usize_as(indices.len()));
201            continue;
202        }
203
204        for element_index in r_start.as_usize() - r_first..r_end.as_usize() - r_first {
205            let right_row = r_values.row(element_index);
206            dedup.insert(right_row);
207        }
208        for element_index in l_start.as_usize() - l_first..l_end.as_usize() - l_first {
209            let left_row = l_values.row(element_index);
210            if dedup.insert(left_row) {
211                indices.push(element_index + l_first);
212            }
213        }
214
215        offsets.push(OffsetSize::usize_as(indices.len()));
216        dedup.clear();
217    }
218
219    // Gather distinct left-side values by index.
220    // Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
221    let values = if indices.is_empty() {
222        arrow::array::new_empty_array(&l.value_type())
223    } else if OffsetSize::IS_LARGE {
224        let indices =
225            UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
226        take(l.values().as_ref(), &indices, None)?
227    } else {
228        let indices =
229            UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
230        take(l.values().as_ref(), &indices, None)?
231    };
232
233    Ok(GenericListArray::<OffsetSize>::new(
234        field.to_owned(),
235        OffsetBuffer::new(offsets.into()),
236        values,
237        nulls,
238    ))
239}
240
241#[cfg(test)]
242mod tests {
243    use super::ArrayExcept;
244    use arrow::array::{Array, AsArray, Int32Array, ListArray};
245    use arrow::datatypes::{Field, Int32Type};
246    use datafusion_common::{Result, config::ConfigOptions};
247    use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
248    use std::sync::Arc;
249
250    #[test]
251    fn test_array_except_sliced_lists() -> Result<()> {
252        // l: [[1,2], [3,4], [5,6], [7,8]]  →  slice(1,2)  →  [[3,4], [5,6]]
253        // r: [[3],   [5],   [6],   [8]]    →  slice(1,2)  →  [[5],   [6]]
254        // except(l, r) should be [[3,4], [5]]
255        let l_full = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
256            Some(vec![Some(1), Some(2)]),
257            Some(vec![Some(3), Some(4)]),
258            Some(vec![Some(5), Some(6)]),
259            Some(vec![Some(7), Some(8)]),
260        ]);
261        let r_full = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
262            Some(vec![Some(3)]),
263            Some(vec![Some(5)]),
264            Some(vec![Some(6)]),
265            Some(vec![Some(8)]),
266        ]);
267        let l_sliced = l_full.slice(1, 2);
268        let r_sliced = r_full.slice(1, 2);
269
270        let list_field = Arc::new(Field::new("item", l_sliced.data_type().clone(), true));
271        let return_field =
272            Arc::new(Field::new("return", l_sliced.data_type().clone(), true));
273
274        let result = ArrayExcept::new().invoke_with_args(ScalarFunctionArgs {
275            args: vec![
276                ColumnarValue::Array(Arc::new(l_sliced)),
277                ColumnarValue::Array(Arc::new(r_sliced)),
278            ],
279            arg_fields: vec![Arc::clone(&list_field), Arc::clone(&list_field)],
280            number_rows: 2,
281            return_field,
282            config_options: Arc::new(ConfigOptions::default()),
283        })?;
284
285        let output = result.into_array(2)?;
286        let output = output.as_list::<i32>();
287
288        // Row 0: [3,4] except [5] = [3,4]
289        let row0 = output.value(0);
290        let row0 = row0.as_any().downcast_ref::<Int32Array>().unwrap();
291        assert_eq!(row0.values().as_ref(), &[3, 4]);
292
293        // Row 1: [5,6] except [6] = [5]
294        let row1 = output.value(1);
295        let row1 = row1.as_any().downcast_ref::<Int32Array>().unwrap();
296        assert_eq!(row1.values().as_ref(), &[5]);
297
298        Ok(())
299    }
300}