datafusion_functions_nested/
except.rs1use crate::utils::{check_datatypes, make_scalar_function};
21use arrow::array::new_null_array;
22use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, cast::AsArray};
23use arrow::buffer::{NullBuffer, OffsetBuffer};
24use arrow::datatypes::{DataType, FieldRef};
25use arrow::row::{RowConverter, SortField};
26use datafusion_common::utils::{ListCoercion, take_function_args};
27use datafusion_common::{HashSet, Result, internal_err};
28use datafusion_expr::{
29 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_macros::user_doc;
32use itertools::Itertools;
33use std::any::Any;
34use std::sync::Arc;
35
36make_udf_expr_and_func!(
37 ArrayExcept,
38 array_except,
39 first_array second_array,
40 "returns an array of the elements that appear in the first array but not in the second.",
41 array_except_udf
42);
43
44#[user_doc(
45 doc_section(label = "Array Functions"),
46 description = "Returns an array of the elements that appear in the first array but not in the second.",
47 syntax_example = "array_except(array1, array2)",
48 sql_example = r#"```sql
49> select array_except([1, 2, 3, 4], [5, 6, 3, 4]);
50+----------------------------------------------------+
51| array_except([1, 2, 3, 4], [5, 6, 3, 4]); |
52+----------------------------------------------------+
53| [1, 2] |
54+----------------------------------------------------+
55> select array_except([1, 2, 3, 4], [3, 4, 5, 6]);
56+----------------------------------------------------+
57| array_except([1, 2, 3, 4], [3, 4, 5, 6]); |
58+----------------------------------------------------+
59| [1, 2] |
60+----------------------------------------------------+
61```"#,
62 argument(
63 name = "array1",
64 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
65 ),
66 argument(
67 name = "array2",
68 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
69 )
70)]
71#[derive(Debug, PartialEq, Eq, Hash)]
72pub struct ArrayExcept {
73 signature: Signature,
74 aliases: Vec<String>,
75}
76
77impl Default for ArrayExcept {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83impl ArrayExcept {
84 pub fn new() -> Self {
85 Self {
86 signature: Signature::arrays(
87 2,
88 Some(ListCoercion::FixedSizedListToList),
89 Volatility::Immutable,
90 ),
91 aliases: vec!["list_except".to_string()],
92 }
93 }
94}
95
96impl ScalarUDFImpl for ArrayExcept {
97 fn as_any(&self) -> &dyn Any {
98 self
99 }
100 fn name(&self) -> &str {
101 "array_except"
102 }
103
104 fn signature(&self) -> &Signature {
105 &self.signature
106 }
107
108 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
109 match (&arg_types[0], &arg_types[1]) {
110 (DataType::Null, DataType::Null) => {
111 Ok(DataType::new_list(DataType::Null, true))
112 }
113 (DataType::Null, dt) | (dt, DataType::Null) => Ok(dt.clone()),
114 (dt, _) => Ok(dt.clone()),
115 }
116 }
117
118 fn invoke_with_args(
119 &self,
120 args: datafusion_expr::ScalarFunctionArgs,
121 ) -> Result<ColumnarValue> {
122 make_scalar_function(array_except_inner)(&args.args)
123 }
124
125 fn aliases(&self) -> &[String] {
126 &self.aliases
127 }
128
129 fn documentation(&self) -> Option<&Documentation> {
130 self.doc()
131 }
132}
133
134fn array_except_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
135 let [array1, array2] = take_function_args("array_except", args)?;
136
137 let len = array1.len();
138 match (array1.data_type(), array2.data_type()) {
139 (DataType::Null, DataType::Null) => Ok(new_null_array(
140 &DataType::new_list(DataType::Null, true),
141 len,
142 )),
143 (DataType::Null, dt @ DataType::List(_))
144 | (DataType::Null, dt @ DataType::LargeList(_))
145 | (dt @ DataType::List(_), DataType::Null)
146 | (dt @ DataType::LargeList(_), DataType::Null) => Ok(new_null_array(dt, len)),
147 (DataType::List(field), DataType::List(_)) => {
148 check_datatypes("array_except", &[array1, array2])?;
149 let list1 = array1.as_list::<i32>();
150 let list2 = array2.as_list::<i32>();
151 let result = general_except::<i32>(list1, list2, field)?;
152 Ok(Arc::new(result))
153 }
154 (DataType::LargeList(field), DataType::LargeList(_)) => {
155 check_datatypes("array_except", &[array1, array2])?;
156 let list1 = array1.as_list::<i64>();
157 let list2 = array2.as_list::<i64>();
158 let result = general_except::<i64>(list1, list2, field)?;
159 Ok(Arc::new(result))
160 }
161 (dt1, dt2) => {
162 internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}")
163 }
164 }
165}
166
167fn general_except<OffsetSize: OffsetSizeTrait>(
168 l: &GenericListArray<OffsetSize>,
169 r: &GenericListArray<OffsetSize>,
170 field: &FieldRef,
171) -> Result<GenericListArray<OffsetSize>> {
172 let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
173
174 let l_values = l.values().to_owned();
175 let r_values = r.values().to_owned();
176 let l_values = converter.convert_columns(&[l_values])?;
177 let r_values = converter.convert_columns(&[r_values])?;
178
179 let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
180 offsets.push(OffsetSize::usize_as(0));
181
182 let mut rows = Vec::with_capacity(l_values.num_rows());
183 let mut dedup = HashSet::new();
184
185 let nulls = NullBuffer::union(l.nulls(), r.nulls());
186
187 let l_offsets_iter = l.offsets().iter().tuple_windows();
188 let r_offsets_iter = r.offsets().iter().tuple_windows();
189 for (list_index, ((l_start, l_end), (r_start, r_end))) in
190 l_offsets_iter.zip(r_offsets_iter).enumerate()
191 {
192 if nulls
193 .as_ref()
194 .is_some_and(|nulls| nulls.is_null(list_index))
195 {
196 offsets.push(OffsetSize::usize_as(rows.len()));
197 continue;
198 }
199
200 for element_index in r_start.as_usize()..r_end.as_usize() {
201 let right_row = r_values.row(element_index);
202 dedup.insert(right_row);
203 }
204 for element_index in l_start.as_usize()..l_end.as_usize() {
205 let left_row = l_values.row(element_index);
206 if dedup.insert(left_row) {
207 rows.push(left_row);
208 }
209 }
210
211 offsets.push(OffsetSize::usize_as(rows.len()));
212 dedup.clear();
213 }
214
215 if let Some(values) = converter.convert_rows(rows)?.first() {
216 Ok(GenericListArray::<OffsetSize>::new(
217 field.to_owned(),
218 OffsetBuffer::new(offsets.into()),
219 values.to_owned(),
220 nulls,
221 ))
222 } else {
223 internal_err!("array_except failed to convert rows")
224 }
225}