datafusion_functions_nested/
except.rs1use crate::utils::{check_datatypes, make_scalar_function};
21use arrow::array::{cast::AsArray, Array, ArrayRef, GenericListArray, OffsetSizeTrait};
22use arrow::buffer::OffsetBuffer;
23use arrow::datatypes::{DataType, FieldRef};
24use arrow::row::{RowConverter, SortField};
25use datafusion_common::utils::{take_function_args, ListCoercion};
26use datafusion_common::{internal_err, HashSet, Result};
27use datafusion_expr::{
28 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
29};
30use datafusion_macros::user_doc;
31use std::any::Any;
32use std::sync::Arc;
33
34make_udf_expr_and_func!(
35 ArrayExcept,
36 array_except,
37 first_array second_array,
38 "returns an array of the elements that appear in the first array but not in the second.",
39 array_except_udf
40);
41
42#[user_doc(
43 doc_section(label = "Array Functions"),
44 description = "Returns an array of the elements that appear in the first array but not in the second.",
45 syntax_example = "array_except(array1, array2)",
46 sql_example = r#"```sql
47> select array_except([1, 2, 3, 4], [5, 6, 3, 4]);
48+----------------------------------------------------+
49| array_except([1, 2, 3, 4], [5, 6, 3, 4]); |
50+----------------------------------------------------+
51| [1, 2] |
52+----------------------------------------------------+
53> select array_except([1, 2, 3, 4], [3, 4, 5, 6]);
54+----------------------------------------------------+
55| array_except([1, 2, 3, 4], [3, 4, 5, 6]); |
56+----------------------------------------------------+
57| [1, 2] |
58+----------------------------------------------------+
59```"#,
60 argument(
61 name = "array1",
62 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
63 ),
64 argument(
65 name = "array2",
66 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
67 )
68)]
69#[derive(Debug, PartialEq, Eq, Hash)]
70pub struct ArrayExcept {
71 signature: Signature,
72 aliases: Vec<String>,
73}
74
75impl Default for ArrayExcept {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81impl ArrayExcept {
82 pub fn new() -> Self {
83 Self {
84 signature: Signature::arrays(
85 2,
86 Some(ListCoercion::FixedSizedListToList),
87 Volatility::Immutable,
88 ),
89 aliases: vec!["list_except".to_string()],
90 }
91 }
92}
93
94impl ScalarUDFImpl for ArrayExcept {
95 fn as_any(&self) -> &dyn Any {
96 self
97 }
98 fn name(&self) -> &str {
99 "array_except"
100 }
101
102 fn signature(&self) -> &Signature {
103 &self.signature
104 }
105
106 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
107 match (&arg_types[0].clone(), &arg_types[1].clone()) {
108 (DataType::Null, _) | (_, DataType::Null) => Ok(arg_types[0].clone()),
109 (dt, _) => Ok(dt.clone()),
110 }
111 }
112
113 fn invoke_with_args(
114 &self,
115 args: datafusion_expr::ScalarFunctionArgs,
116 ) -> Result<ColumnarValue> {
117 make_scalar_function(array_except_inner)(&args.args)
118 }
119
120 fn aliases(&self) -> &[String] {
121 &self.aliases
122 }
123
124 fn documentation(&self) -> Option<&Documentation> {
125 self.doc()
126 }
127}
128
129pub fn array_except_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
131 let [array1, array2] = take_function_args("array_except", args)?;
132
133 match (array1.data_type(), array2.data_type()) {
134 (DataType::Null, _) | (_, DataType::Null) => Ok(array1.to_owned()),
135 (DataType::List(field), DataType::List(_)) => {
136 check_datatypes("array_except", &[array1, array2])?;
137 let list1 = array1.as_list::<i32>();
138 let list2 = array2.as_list::<i32>();
139 let result = general_except::<i32>(list1, list2, field)?;
140 Ok(Arc::new(result))
141 }
142 (DataType::LargeList(field), DataType::LargeList(_)) => {
143 check_datatypes("array_except", &[array1, array2])?;
144 let list1 = array1.as_list::<i64>();
145 let list2 = array2.as_list::<i64>();
146 let result = general_except::<i64>(list1, list2, field)?;
147 Ok(Arc::new(result))
148 }
149 (dt1, dt2) => {
150 internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}")
151 }
152 }
153}
154
155fn general_except<OffsetSize: OffsetSizeTrait>(
156 l: &GenericListArray<OffsetSize>,
157 r: &GenericListArray<OffsetSize>,
158 field: &FieldRef,
159) -> Result<GenericListArray<OffsetSize>> {
160 let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
161
162 let l_values = l.values().to_owned();
163 let r_values = r.values().to_owned();
164 let l_values = converter.convert_columns(&[l_values])?;
165 let r_values = converter.convert_columns(&[r_values])?;
166
167 let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
168 offsets.push(OffsetSize::usize_as(0));
169
170 let mut rows = Vec::with_capacity(l_values.num_rows());
171 let mut dedup = HashSet::new();
172
173 for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
174 let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
175 let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
176 for i in r_slice {
177 let right_row = r_values.row(i);
178 dedup.insert(right_row);
179 }
180 for i in l_slice {
181 let left_row = l_values.row(i);
182 if dedup.insert(left_row) {
183 rows.push(left_row);
184 }
185 }
186
187 offsets.push(OffsetSize::usize_as(rows.len()));
188 dedup.clear();
189 }
190
191 if let Some(values) = converter.convert_rows(rows)?.first() {
192 Ok(GenericListArray::<OffsetSize>::new(
193 field.to_owned(),
194 OffsetBuffer::new(offsets.into()),
195 values.to_owned(),
196 l.nulls().cloned(),
197 ))
198 } else {
199 internal_err!("array_except failed to convert rows")
200 }
201}