datafusion_functions_nested/
except.rs1use crate::utils::{check_datatypes, make_scalar_function};
21use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, cast::AsArray};
22use arrow::buffer::OffsetBuffer;
23use arrow::datatypes::{DataType, FieldRef};
24use arrow::row::{RowConverter, SortField};
25use datafusion_common::utils::{ListCoercion, take_function_args};
26use datafusion_common::{HashSet, Result, internal_err};
27use datafusion_expr::{
28 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
29};
30use datafusion_macros::user_doc;
31use std::any::Any;
32use std::sync::Arc;
33
34make_udf_expr_and_func!(
35 ArrayExcept,
36 array_except,
37 first_array second_array,
38 "returns an array of the elements that appear in the first array but not in the second.",
39 array_except_udf
40);
41
42#[user_doc(
43 doc_section(label = "Array Functions"),
44 description = "Returns an array of the elements that appear in the first array but not in the second.",
45 syntax_example = "array_except(array1, array2)",
46 sql_example = r#"```sql
47> select array_except([1, 2, 3, 4], [5, 6, 3, 4]);
48+----------------------------------------------------+
49| array_except([1, 2, 3, 4], [5, 6, 3, 4]); |
50+----------------------------------------------------+
51| [1, 2] |
52+----------------------------------------------------+
53> select array_except([1, 2, 3, 4], [3, 4, 5, 6]);
54+----------------------------------------------------+
55| array_except([1, 2, 3, 4], [3, 4, 5, 6]); |
56+----------------------------------------------------+
57| [1, 2] |
58+----------------------------------------------------+
59```"#,
60 argument(
61 name = "array1",
62 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
63 ),
64 argument(
65 name = "array2",
66 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
67 )
68)]
69#[derive(Debug, PartialEq, Eq, Hash)]
70pub struct ArrayExcept {
71 signature: Signature,
72 aliases: Vec<String>,
73}
74
75impl Default for ArrayExcept {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81impl ArrayExcept {
82 pub fn new() -> Self {
83 Self {
84 signature: Signature::arrays(
85 2,
86 Some(ListCoercion::FixedSizedListToList),
87 Volatility::Immutable,
88 ),
89 aliases: vec!["list_except".to_string()],
90 }
91 }
92}
93
94impl ScalarUDFImpl for ArrayExcept {
95 fn as_any(&self) -> &dyn Any {
96 self
97 }
98 fn name(&self) -> &str {
99 "array_except"
100 }
101
102 fn signature(&self) -> &Signature {
103 &self.signature
104 }
105
106 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
107 match (&arg_types[0].clone(), &arg_types[1].clone()) {
108 (DataType::Null, _) | (_, DataType::Null) => Ok(arg_types[0].clone()),
109 (dt, _) => Ok(dt.clone()),
110 }
111 }
112
113 fn invoke_with_args(
114 &self,
115 args: datafusion_expr::ScalarFunctionArgs,
116 ) -> Result<ColumnarValue> {
117 make_scalar_function(array_except_inner)(&args.args)
118 }
119
120 fn aliases(&self) -> &[String] {
121 &self.aliases
122 }
123
124 fn documentation(&self) -> Option<&Documentation> {
125 self.doc()
126 }
127}
128
129fn array_except_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
130 let [array1, array2] = take_function_args("array_except", args)?;
131
132 match (array1.data_type(), array2.data_type()) {
133 (DataType::Null, _) | (_, DataType::Null) => Ok(array1.to_owned()),
134 (DataType::List(field), DataType::List(_)) => {
135 check_datatypes("array_except", &[array1, array2])?;
136 let list1 = array1.as_list::<i32>();
137 let list2 = array2.as_list::<i32>();
138 let result = general_except::<i32>(list1, list2, field)?;
139 Ok(Arc::new(result))
140 }
141 (DataType::LargeList(field), DataType::LargeList(_)) => {
142 check_datatypes("array_except", &[array1, array2])?;
143 let list1 = array1.as_list::<i64>();
144 let list2 = array2.as_list::<i64>();
145 let result = general_except::<i64>(list1, list2, field)?;
146 Ok(Arc::new(result))
147 }
148 (dt1, dt2) => {
149 internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}")
150 }
151 }
152}
153
154fn general_except<OffsetSize: OffsetSizeTrait>(
155 l: &GenericListArray<OffsetSize>,
156 r: &GenericListArray<OffsetSize>,
157 field: &FieldRef,
158) -> Result<GenericListArray<OffsetSize>> {
159 let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
160
161 let l_values = l.values().to_owned();
162 let r_values = r.values().to_owned();
163 let l_values = converter.convert_columns(&[l_values])?;
164 let r_values = converter.convert_columns(&[r_values])?;
165
166 let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
167 offsets.push(OffsetSize::usize_as(0));
168
169 let mut rows = Vec::with_capacity(l_values.num_rows());
170 let mut dedup = HashSet::new();
171
172 for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
173 let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
174 let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
175 for i in r_slice {
176 let right_row = r_values.row(i);
177 dedup.insert(right_row);
178 }
179 for i in l_slice {
180 let left_row = l_values.row(i);
181 if dedup.insert(left_row) {
182 rows.push(left_row);
183 }
184 }
185
186 offsets.push(OffsetSize::usize_as(rows.len()));
187 dedup.clear();
188 }
189
190 if let Some(values) = converter.convert_rows(rows)?.first() {
191 Ok(GenericListArray::<OffsetSize>::new(
192 field.to_owned(),
193 OffsetBuffer::new(offsets.into()),
194 values.to_owned(),
195 l.nulls().cloned(),
196 ))
197 } else {
198 internal_err!("array_except failed to convert rows")
199 }
200}