datafusion_functions_nested/
except.rs1use crate::utils::{check_datatypes, make_scalar_function};
21use arrow::array::{cast::AsArray, Array, ArrayRef, GenericListArray, OffsetSizeTrait};
22use arrow::buffer::OffsetBuffer;
23use arrow::datatypes::{DataType, FieldRef};
24use arrow::row::{RowConverter, SortField};
25use datafusion_common::utils::take_function_args;
26use datafusion_common::{internal_err, HashSet, Result};
27use datafusion_expr::{
28 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
29};
30use datafusion_macros::user_doc;
31use std::any::Any;
32use std::sync::Arc;
33
34make_udf_expr_and_func!(
35 ArrayExcept,
36 array_except,
37 first_array second_array,
38 "returns an array of the elements that appear in the first array but not in the second.",
39 array_except_udf
40);
41
42#[user_doc(
43 doc_section(label = "Array Functions"),
44 description = "Returns an array of the elements that appear in the first array but not in the second.",
45 syntax_example = "array_except(array1, array2)",
46 sql_example = r#"```sql
47> select array_except([1, 2, 3, 4], [5, 6, 3, 4]);
48+----------------------------------------------------+
49| array_except([1, 2, 3, 4], [5, 6, 3, 4]); |
50+----------------------------------------------------+
51| [1, 2] |
52+----------------------------------------------------+
53> select array_except([1, 2, 3, 4], [3, 4, 5, 6]);
54+----------------------------------------------------+
55| array_except([1, 2, 3, 4], [3, 4, 5, 6]); |
56+----------------------------------------------------+
57| [1, 2] |
58+----------------------------------------------------+
59```"#,
60 argument(
61 name = "array1",
62 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
63 ),
64 argument(
65 name = "array2",
66 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
67 )
68)]
69#[derive(Debug)]
70pub struct ArrayExcept {
71 signature: Signature,
72 aliases: Vec<String>,
73}
74
75impl Default for ArrayExcept {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81impl ArrayExcept {
82 pub fn new() -> Self {
83 Self {
84 signature: Signature::any(2, Volatility::Immutable),
85 aliases: vec!["list_except".to_string()],
86 }
87 }
88}
89
90impl ScalarUDFImpl for ArrayExcept {
91 fn as_any(&self) -> &dyn Any {
92 self
93 }
94 fn name(&self) -> &str {
95 "array_except"
96 }
97
98 fn signature(&self) -> &Signature {
99 &self.signature
100 }
101
102 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
103 match (&arg_types[0].clone(), &arg_types[1].clone()) {
104 (DataType::Null, _) | (_, DataType::Null) => Ok(arg_types[0].clone()),
105 (dt, _) => Ok(dt.clone()),
106 }
107 }
108
109 fn invoke_with_args(
110 &self,
111 args: datafusion_expr::ScalarFunctionArgs,
112 ) -> Result<ColumnarValue> {
113 make_scalar_function(array_except_inner)(&args.args)
114 }
115
116 fn aliases(&self) -> &[String] {
117 &self.aliases
118 }
119
120 fn documentation(&self) -> Option<&Documentation> {
121 self.doc()
122 }
123}
124
125pub fn array_except_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
127 let [array1, array2] = take_function_args("array_except", args)?;
128
129 match (array1.data_type(), array2.data_type()) {
130 (DataType::Null, _) | (_, DataType::Null) => Ok(array1.to_owned()),
131 (DataType::List(field), DataType::List(_)) => {
132 check_datatypes("array_except", &[array1, array2])?;
133 let list1 = array1.as_list::<i32>();
134 let list2 = array2.as_list::<i32>();
135 let result = general_except::<i32>(list1, list2, field)?;
136 Ok(Arc::new(result))
137 }
138 (DataType::LargeList(field), DataType::LargeList(_)) => {
139 check_datatypes("array_except", &[array1, array2])?;
140 let list1 = array1.as_list::<i64>();
141 let list2 = array2.as_list::<i64>();
142 let result = general_except::<i64>(list1, list2, field)?;
143 Ok(Arc::new(result))
144 }
145 (dt1, dt2) => {
146 internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}")
147 }
148 }
149}
150
151fn general_except<OffsetSize: OffsetSizeTrait>(
152 l: &GenericListArray<OffsetSize>,
153 r: &GenericListArray<OffsetSize>,
154 field: &FieldRef,
155) -> Result<GenericListArray<OffsetSize>> {
156 let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
157
158 let l_values = l.values().to_owned();
159 let r_values = r.values().to_owned();
160 let l_values = converter.convert_columns(&[l_values])?;
161 let r_values = converter.convert_columns(&[r_values])?;
162
163 let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
164 offsets.push(OffsetSize::usize_as(0));
165
166 let mut rows = Vec::with_capacity(l_values.num_rows());
167 let mut dedup = HashSet::new();
168
169 for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
170 let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
171 let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
172 for i in r_slice {
173 let right_row = r_values.row(i);
174 dedup.insert(right_row);
175 }
176 for i in l_slice {
177 let left_row = l_values.row(i);
178 if dedup.insert(left_row) {
179 rows.push(left_row);
180 }
181 }
182
183 offsets.push(OffsetSize::usize_as(rows.len()));
184 dedup.clear();
185 }
186
187 if let Some(values) = converter.convert_rows(rows)?.first() {
188 Ok(GenericListArray::<OffsetSize>::new(
189 field.to_owned(),
190 OffsetBuffer::new(offsets.into()),
191 values.to_owned(),
192 l.nulls().cloned(),
193 ))
194 } else {
195 internal_err!("array_except failed to convert rows")
196 }
197}