1use crate::utils::{check_datatypes, make_scalar_function};
21use arrow::array::new_null_array;
22use arrow::array::{
23 Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, UInt64Array,
24 cast::AsArray,
25};
26use arrow::buffer::{NullBuffer, OffsetBuffer};
27use arrow::compute::take;
28use arrow::datatypes::{DataType, FieldRef};
29use arrow::row::{RowConverter, SortField};
30use datafusion_common::utils::{ListCoercion, take_function_args};
31use datafusion_common::{HashSet, Result, internal_err};
32use datafusion_expr::{
33 ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
34 Volatility,
35};
36use datafusion_macros::user_doc;
37use itertools::Itertools;
38use std::sync::Arc;
39
40make_udf_expr_and_func!(
41 ArrayExcept,
42 array_except,
43 first_array second_array,
44 "returns an array of the elements that appear in the first array but not in the second.",
45 array_except_udf
46);
47
48#[user_doc(
49 doc_section(label = "Array Functions"),
50 description = "Returns an array of the elements that appear in the first array but not in the second.",
51 syntax_example = "array_except(array1, array2)",
52 sql_example = r#"```sql
53> select array_except([1, 2, 3, 4], [5, 6, 3, 4]);
54+----------------------------------------------------+
55| array_except([1, 2, 3, 4], [5, 6, 3, 4]); |
56+----------------------------------------------------+
57| [1, 2] |
58+----------------------------------------------------+
59> select array_except([1, 2, 3, 4], [3, 4, 5, 6]);
60+----------------------------------------------------+
61| array_except([1, 2, 3, 4], [3, 4, 5, 6]); |
62+----------------------------------------------------+
63| [1, 2] |
64+----------------------------------------------------+
65```"#,
66 argument(
67 name = "array1",
68 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
69 ),
70 argument(
71 name = "array2",
72 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
73 )
74)]
75#[derive(Debug, PartialEq, Eq, Hash)]
76pub struct ArrayExcept {
77 signature: Signature,
78 aliases: Vec<String>,
79}
80
81impl Default for ArrayExcept {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl ArrayExcept {
88 pub fn new() -> Self {
89 Self {
90 signature: Signature::arrays(
91 2,
92 Some(ListCoercion::FixedSizedListToList),
93 Volatility::Immutable,
94 ),
95 aliases: vec!["list_except".to_string()],
96 }
97 }
98}
99
100impl ScalarUDFImpl for ArrayExcept {
101 fn name(&self) -> &str {
102 "array_except"
103 }
104
105 fn signature(&self) -> &Signature {
106 &self.signature
107 }
108
109 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
110 match (&arg_types[0], &arg_types[1]) {
111 (DataType::Null, DataType::Null) => {
112 Ok(DataType::new_list(DataType::Null, true))
113 }
114 (DataType::Null, dt) | (dt, DataType::Null) => Ok(dt.clone()),
115 (dt, _) => Ok(dt.clone()),
116 }
117 }
118
119 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
120 make_scalar_function(array_except_inner)(&args.args)
121 }
122
123 fn aliases(&self) -> &[String] {
124 &self.aliases
125 }
126
127 fn documentation(&self) -> Option<&Documentation> {
128 self.doc()
129 }
130}
131
132fn array_except_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
133 let [array1, array2] = take_function_args("array_except", args)?;
134
135 let len = array1.len();
136 match (array1.data_type(), array2.data_type()) {
137 (DataType::Null, DataType::Null) => Ok(new_null_array(
138 &DataType::new_list(DataType::Null, true),
139 len,
140 )),
141 (DataType::Null, dt @ DataType::List(_))
142 | (DataType::Null, dt @ DataType::LargeList(_))
143 | (dt @ DataType::List(_), DataType::Null)
144 | (dt @ DataType::LargeList(_), DataType::Null) => Ok(new_null_array(dt, len)),
145 (DataType::List(field), DataType::List(_)) => {
146 check_datatypes("array_except", &[array1, array2])?;
147 let list1 = array1.as_list::<i32>();
148 let list2 = array2.as_list::<i32>();
149 let result = general_except::<i32>(list1, list2, field)?;
150 Ok(Arc::new(result))
151 }
152 (DataType::LargeList(field), DataType::LargeList(_)) => {
153 check_datatypes("array_except", &[array1, array2])?;
154 let list1 = array1.as_list::<i64>();
155 let list2 = array2.as_list::<i64>();
156 let result = general_except::<i64>(list1, list2, field)?;
157 Ok(Arc::new(result))
158 }
159 (dt1, dt2) => {
160 internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}")
161 }
162 }
163}
164
165fn general_except<OffsetSize: OffsetSizeTrait>(
166 l: &GenericListArray<OffsetSize>,
167 r: &GenericListArray<OffsetSize>,
168 field: &FieldRef,
169) -> Result<GenericListArray<OffsetSize>> {
170 let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
171
172 let l_first = l.offsets()[0].as_usize();
176 let l_len = l.offsets()[l.len()].as_usize() - l_first;
177 let l_values = converter.convert_columns(&[l.values().slice(l_first, l_len)])?;
178
179 let r_first = r.offsets()[0].as_usize();
180 let r_len = r.offsets()[r.len()].as_usize() - r_first;
181 let r_values = converter.convert_columns(&[r.values().slice(r_first, r_len)])?;
182
183 let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
184 offsets.push(OffsetSize::usize_as(0));
185
186 let mut indices: Vec<usize> = Vec::with_capacity(l_values.num_rows());
187 let mut dedup = HashSet::new();
188
189 let nulls = NullBuffer::union(l.nulls(), r.nulls());
190
191 let l_offsets_iter = l.offsets().iter().tuple_windows();
192 let r_offsets_iter = r.offsets().iter().tuple_windows();
193 for (list_index, ((l_start, l_end), (r_start, r_end))) in
194 l_offsets_iter.zip(r_offsets_iter).enumerate()
195 {
196 if nulls
197 .as_ref()
198 .is_some_and(|nulls| nulls.is_null(list_index))
199 {
200 offsets.push(OffsetSize::usize_as(indices.len()));
201 continue;
202 }
203
204 for element_index in r_start.as_usize() - r_first..r_end.as_usize() - r_first {
205 let right_row = r_values.row(element_index);
206 dedup.insert(right_row);
207 }
208 for element_index in l_start.as_usize() - l_first..l_end.as_usize() - l_first {
209 let left_row = l_values.row(element_index);
210 if dedup.insert(left_row) {
211 indices.push(element_index + l_first);
212 }
213 }
214
215 offsets.push(OffsetSize::usize_as(indices.len()));
216 dedup.clear();
217 }
218
219 let values = if indices.is_empty() {
222 arrow::array::new_empty_array(&l.value_type())
223 } else if OffsetSize::IS_LARGE {
224 let indices =
225 UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
226 take(l.values().as_ref(), &indices, None)?
227 } else {
228 let indices =
229 UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
230 take(l.values().as_ref(), &indices, None)?
231 };
232
233 Ok(GenericListArray::<OffsetSize>::new(
234 field.to_owned(),
235 OffsetBuffer::new(offsets.into()),
236 values,
237 nulls,
238 ))
239}
240
241#[cfg(test)]
242mod tests {
243 use super::ArrayExcept;
244 use arrow::array::{Array, AsArray, Int32Array, ListArray};
245 use arrow::datatypes::{Field, Int32Type};
246 use datafusion_common::{Result, config::ConfigOptions};
247 use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
248 use std::sync::Arc;
249
250 #[test]
251 fn test_array_except_sliced_lists() -> Result<()> {
252 let l_full = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
256 Some(vec![Some(1), Some(2)]),
257 Some(vec![Some(3), Some(4)]),
258 Some(vec![Some(5), Some(6)]),
259 Some(vec![Some(7), Some(8)]),
260 ]);
261 let r_full = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
262 Some(vec![Some(3)]),
263 Some(vec![Some(5)]),
264 Some(vec![Some(6)]),
265 Some(vec![Some(8)]),
266 ]);
267 let l_sliced = l_full.slice(1, 2);
268 let r_sliced = r_full.slice(1, 2);
269
270 let list_field = Arc::new(Field::new("item", l_sliced.data_type().clone(), true));
271 let return_field =
272 Arc::new(Field::new("return", l_sliced.data_type().clone(), true));
273
274 let result = ArrayExcept::new().invoke_with_args(ScalarFunctionArgs {
275 args: vec![
276 ColumnarValue::Array(Arc::new(l_sliced)),
277 ColumnarValue::Array(Arc::new(r_sliced)),
278 ],
279 arg_fields: vec![Arc::clone(&list_field), Arc::clone(&list_field)],
280 number_rows: 2,
281 return_field,
282 config_options: Arc::new(ConfigOptions::default()),
283 })?;
284
285 let output = result.into_array(2)?;
286 let output = output.as_list::<i32>();
287
288 let row0 = output.value(0);
290 let row0 = row0.as_any().downcast_ref::<Int32Array>().unwrap();
291 assert_eq!(row0.values().as_ref(), &[3, 4]);
292
293 let row1 = output.value(1);
295 let row1 = row1.as_any().downcast_ref::<Int32Array>().unwrap();
296 assert_eq!(row1.values().as_ref(), &[5]);
297
298 Ok(())
299 }
300}