1use std::sync::Arc;
21use std::{any::Any, cmp::Ordering};
22
23use arrow::array::{
24 Array, ArrayRef, Capacities, GenericListArray, MutableArrayData, NullBufferBuilder,
25 OffsetSizeTrait,
26};
27use arrow::buffer::OffsetBuffer;
28use arrow::datatypes::{DataType, Field};
29use datafusion_common::utils::ListCoercion;
30use datafusion_common::Result;
31use datafusion_common::{
32 cast::as_generic_list_array,
33 exec_err, not_impl_err, plan_err,
34 utils::{list_ndims, take_function_args},
35};
36use datafusion_expr::{
37 ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, Documentation,
38 ScalarUDFImpl, Signature, TypeSignature, Volatility,
39};
40use datafusion_macros::user_doc;
41
42use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
43
44make_udf_expr_and_func!(
45 ArrayAppend,
46 array_append,
47 array element, "appends an element to the end of an array.", array_append_udf );
51
52#[user_doc(
53 doc_section(label = "Array Functions"),
54 description = "Appends an element to the end of an array.",
55 syntax_example = "array_append(array, element)",
56 sql_example = r#"```sql
57> select array_append([1, 2, 3], 4);
58+--------------------------------------+
59| array_append(List([1,2,3]),Int64(4)) |
60+--------------------------------------+
61| [1, 2, 3, 4] |
62+--------------------------------------+
63```"#,
64 argument(
65 name = "array",
66 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
67 ),
68 argument(name = "element", description = "Element to append to the array.")
69)]
70#[derive(Debug)]
71pub struct ArrayAppend {
72 signature: Signature,
73 aliases: Vec<String>,
74}
75
76impl Default for ArrayAppend {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82impl ArrayAppend {
83 pub fn new() -> Self {
84 Self {
85 signature: Signature::array_and_element(Volatility::Immutable),
86 aliases: vec![
87 String::from("list_append"),
88 String::from("array_push_back"),
89 String::from("list_push_back"),
90 ],
91 }
92 }
93}
94
95impl ScalarUDFImpl for ArrayAppend {
96 fn as_any(&self) -> &dyn Any {
97 self
98 }
99
100 fn name(&self) -> &str {
101 "array_append"
102 }
103
104 fn signature(&self) -> &Signature {
105 &self.signature
106 }
107
108 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
109 Ok(arg_types[0].clone())
110 }
111
112 fn invoke_with_args(
113 &self,
114 args: datafusion_expr::ScalarFunctionArgs,
115 ) -> Result<ColumnarValue> {
116 make_scalar_function(array_append_inner)(&args.args)
117 }
118
119 fn aliases(&self) -> &[String] {
120 &self.aliases
121 }
122
123 fn documentation(&self) -> Option<&Documentation> {
124 self.doc()
125 }
126}
127
128make_udf_expr_and_func!(
129 ArrayPrepend,
130 array_prepend,
131 element array,
132 "Prepends an element to the beginning of an array.",
133 array_prepend_udf
134);
135
136#[user_doc(
137 doc_section(label = "Array Functions"),
138 description = "Prepends an element to the beginning of an array.",
139 syntax_example = "array_prepend(element, array)",
140 sql_example = r#"```sql
141> select array_prepend(1, [2, 3, 4]);
142+---------------------------------------+
143| array_prepend(Int64(1),List([2,3,4])) |
144+---------------------------------------+
145| [1, 2, 3, 4] |
146+---------------------------------------+
147```"#,
148 argument(
149 name = "array",
150 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
151 ),
152 argument(name = "element", description = "Element to prepend to the array.")
153)]
154#[derive(Debug)]
155pub struct ArrayPrepend {
156 signature: Signature,
157 aliases: Vec<String>,
158}
159
160impl Default for ArrayPrepend {
161 fn default() -> Self {
162 Self::new()
163 }
164}
165
166impl ArrayPrepend {
167 pub fn new() -> Self {
168 Self {
169 signature: Signature {
170 type_signature: TypeSignature::ArraySignature(
171 ArrayFunctionSignature::Array {
172 arguments: vec![
173 ArrayFunctionArgument::Element,
174 ArrayFunctionArgument::Array,
175 ],
176 array_coercion: Some(ListCoercion::FixedSizedListToList),
177 },
178 ),
179 volatility: Volatility::Immutable,
180 },
181 aliases: vec![
182 String::from("list_prepend"),
183 String::from("array_push_front"),
184 String::from("list_push_front"),
185 ],
186 }
187 }
188}
189
190impl ScalarUDFImpl for ArrayPrepend {
191 fn as_any(&self) -> &dyn Any {
192 self
193 }
194
195 fn name(&self) -> &str {
196 "array_prepend"
197 }
198
199 fn signature(&self) -> &Signature {
200 &self.signature
201 }
202
203 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
204 Ok(arg_types[1].clone())
205 }
206
207 fn invoke_with_args(
208 &self,
209 args: datafusion_expr::ScalarFunctionArgs,
210 ) -> Result<ColumnarValue> {
211 make_scalar_function(array_prepend_inner)(&args.args)
212 }
213
214 fn aliases(&self) -> &[String] {
215 &self.aliases
216 }
217
218 fn documentation(&self) -> Option<&Documentation> {
219 self.doc()
220 }
221}
222
223make_udf_expr_and_func!(
224 ArrayConcat,
225 array_concat,
226 "Concatenates arrays.",
227 array_concat_udf
228);
229
230#[user_doc(
231 doc_section(label = "Array Functions"),
232 description = "Concatenates arrays.",
233 syntax_example = "array_concat(array[, ..., array_n])",
234 sql_example = r#"```sql
235> select array_concat([1, 2], [3, 4], [5, 6]);
236+---------------------------------------------------+
237| array_concat(List([1,2]),List([3,4]),List([5,6])) |
238+---------------------------------------------------+
239| [1, 2, 3, 4, 5, 6] |
240+---------------------------------------------------+
241```"#,
242 argument(
243 name = "array",
244 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
245 ),
246 argument(
247 name = "array_n",
248 description = "Subsequent array column or literal array to concatenate."
249 )
250)]
251#[derive(Debug)]
252pub struct ArrayConcat {
253 signature: Signature,
254 aliases: Vec<String>,
255}
256
257impl Default for ArrayConcat {
258 fn default() -> Self {
259 Self::new()
260 }
261}
262
263impl ArrayConcat {
264 pub fn new() -> Self {
265 Self {
266 signature: Signature::variadic_any(Volatility::Immutable),
267 aliases: vec![
268 String::from("array_cat"),
269 String::from("list_concat"),
270 String::from("list_cat"),
271 ],
272 }
273 }
274}
275
276impl ScalarUDFImpl for ArrayConcat {
277 fn as_any(&self) -> &dyn Any {
278 self
279 }
280
281 fn name(&self) -> &str {
282 "array_concat"
283 }
284
285 fn signature(&self) -> &Signature {
286 &self.signature
287 }
288
289 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
290 let mut expr_type = DataType::Null;
291 let mut max_dims = 0;
292 for arg_type in arg_types {
293 let DataType::List(field) = arg_type else {
294 return plan_err!(
295 "The array_concat function can only accept list as the args."
296 );
297 };
298 if !field.data_type().equals_datatype(&DataType::Null) {
299 let dims = list_ndims(arg_type);
300 expr_type = match max_dims.cmp(&dims) {
301 Ordering::Greater => expr_type,
302 Ordering::Equal => {
303 if expr_type == DataType::Null {
304 arg_type.clone()
305 } else if !expr_type.equals_datatype(arg_type) {
306 return plan_err!(
307 "It is not possible to concatenate arrays of different types. Expected: {}, got: {}", expr_type, arg_type
308 );
309 } else {
310 expr_type
311 }
312 }
313
314 Ordering::Less => {
315 max_dims = dims;
316 arg_type.clone()
317 }
318 };
319 }
320 }
321
322 Ok(expr_type)
323 }
324
325 fn invoke_with_args(
326 &self,
327 args: datafusion_expr::ScalarFunctionArgs,
328 ) -> Result<ColumnarValue> {
329 make_scalar_function(array_concat_inner)(&args.args)
330 }
331
332 fn aliases(&self) -> &[String] {
333 &self.aliases
334 }
335
336 fn documentation(&self) -> Option<&Documentation> {
337 self.doc()
338 }
339}
340
341pub(crate) fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
343 if args.is_empty() {
344 return exec_err!("array_concat expects at least one arguments");
345 }
346
347 let mut new_args = vec![];
348 for arg in args {
349 let ndim = list_ndims(arg.data_type());
350 let base_type = datafusion_common::utils::base_type(arg.data_type());
351 if ndim == 0 {
352 return not_impl_err!("Array is not type '{base_type:?}'.");
353 }
354 if !base_type.eq(&DataType::Null) {
355 new_args.push(Arc::clone(arg));
356 }
357 }
358
359 match &args[0].data_type() {
360 DataType::LargeList(_) => concat_internal::<i64>(new_args.as_slice()),
361 _ => concat_internal::<i32>(new_args.as_slice()),
362 }
363}
364
365fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
366 let args = align_array_dimensions::<O>(args.to_vec())?;
367
368 let list_arrays = args
369 .iter()
370 .map(|arg| as_generic_list_array::<O>(arg))
371 .collect::<Result<Vec<_>>>()?;
372 let row_count = list_arrays[0].len();
374
375 let mut array_lengths = vec![];
376 let mut arrays = vec![];
377 let mut valid = NullBufferBuilder::new(row_count);
378 for i in 0..row_count {
379 let nulls = list_arrays
380 .iter()
381 .map(|arr| arr.is_null(i))
382 .collect::<Vec<_>>();
383
384 let is_null = nulls.iter().all(|&x| x);
386 if is_null {
387 array_lengths.push(0);
388 valid.append_null();
389 } else {
390 let values = list_arrays
392 .iter()
393 .map(|arr| arr.value(i))
394 .collect::<Vec<_>>();
395
396 let elements = values
397 .iter()
398 .map(|a| a.as_ref())
399 .collect::<Vec<&dyn Array>>();
400
401 let concatenated_array = arrow::compute::concat(elements.as_slice())?;
403 array_lengths.push(concatenated_array.len());
404 arrays.push(concatenated_array);
405 valid.append_non_null();
406 }
407 }
408 let data_type = list_arrays[0].value_type();
410
411 let elements = arrays
412 .iter()
413 .map(|a| a.as_ref())
414 .collect::<Vec<&dyn Array>>();
415
416 let list_arr = GenericListArray::<O>::new(
417 Arc::new(Field::new_list_field(data_type, true)),
418 OffsetBuffer::from_lengths(array_lengths),
419 Arc::new(arrow::compute::concat(elements.as_slice())?),
420 valid.finish(),
421 );
422
423 Ok(Arc::new(list_arr))
424}
425
426pub(crate) fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
430 let [array, _] = take_function_args("array_append", args)?;
431
432 match array.data_type() {
433 DataType::LargeList(_) => general_append_and_prepend::<i64>(args, true),
434 _ => general_append_and_prepend::<i32>(args, true),
435 }
436}
437
438pub(crate) fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
440 let [_, array] = take_function_args("array_prepend", args)?;
441
442 match array.data_type() {
443 DataType::LargeList(_) => general_append_and_prepend::<i64>(args, false),
444 _ => general_append_and_prepend::<i32>(args, false),
445 }
446}
447
448fn general_append_and_prepend<O: OffsetSizeTrait>(
449 args: &[ArrayRef],
450 is_append: bool,
451) -> Result<ArrayRef>
452where
453 i64: TryInto<O>,
454{
455 let (list_array, element_array) = if is_append {
456 let list_array = as_generic_list_array::<O>(&args[0])?;
457 let element_array = &args[1];
458 check_datatypes("array_append", &[element_array, list_array.values()])?;
459 (list_array, element_array)
460 } else {
461 let list_array = as_generic_list_array::<O>(&args[1])?;
462 let element_array = &args[0];
463 check_datatypes("array_prepend", &[list_array.values(), element_array])?;
464 (list_array, element_array)
465 };
466
467 let res = match list_array.value_type() {
468 DataType::List(_) => concat_internal::<O>(args)?,
469 DataType::LargeList(_) => concat_internal::<O>(args)?,
470 data_type => {
471 return generic_append_and_prepend::<O>(
472 list_array,
473 element_array,
474 &data_type,
475 is_append,
476 );
477 }
478 };
479
480 Ok(res)
481}
482
483fn generic_append_and_prepend<O: OffsetSizeTrait>(
503 list_array: &GenericListArray<O>,
504 element_array: &ArrayRef,
505 data_type: &DataType,
506 is_append: bool,
507) -> Result<ArrayRef>
508where
509 i64: TryInto<O>,
510{
511 let mut offsets = vec![O::usize_as(0)];
512 let values = list_array.values();
513 let original_data = values.to_data();
514 let element_data = element_array.to_data();
515 let capacity = Capacities::Array(original_data.len() + element_data.len());
516
517 let mut mutable = MutableArrayData::with_capacities(
518 vec![&original_data, &element_data],
519 false,
520 capacity,
521 );
522
523 let values_index = 0;
524 let element_index = 1;
525
526 for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
527 let start = offset_window[0].to_usize().unwrap();
528 let end = offset_window[1].to_usize().unwrap();
529 if is_append {
530 mutable.extend(values_index, start, end);
531 mutable.extend(element_index, row_index, row_index + 1);
532 } else {
533 mutable.extend(element_index, row_index, row_index + 1);
534 mutable.extend(values_index, start, end);
535 }
536 offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
537 }
538
539 let data = mutable.freeze();
540
541 Ok(Arc::new(GenericListArray::<O>::try_new(
542 Arc::new(Field::new_list_field(data_type.to_owned(), true)),
543 OffsetBuffer::new(offsets.into()),
544 arrow::array::make_array(data),
545 None,
546 )?))
547}