1use std::any::Any;
21use std::sync::Arc;
22
23use crate::make_array::make_array_inner;
24use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
25use arrow::array::{
26 Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
27 NullBufferBuilder, OffsetSizeTrait,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::{DataType, Field};
31use datafusion_common::utils::{
32 base_type, coerced_type_with_base_type_only, ListCoercion,
33};
34use datafusion_common::Result;
35use datafusion_common::{
36 cast::as_generic_list_array,
37 exec_err, plan_err,
38 utils::{list_ndims, take_function_args},
39};
40use datafusion_expr::binary::type_union_resolution;
41use datafusion_expr::{
42 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45use itertools::Itertools;
46
47make_udf_expr_and_func!(
48 ArrayAppend,
49 array_append,
50 array element, "appends an element to the end of an array.", array_append_udf );
54
55#[user_doc(
56 doc_section(label = "Array Functions"),
57 description = "Appends an element to the end of an array.",
58 syntax_example = "array_append(array, element)",
59 sql_example = r#"```sql
60> select array_append([1, 2, 3], 4);
61+--------------------------------------+
62| array_append(List([1,2,3]),Int64(4)) |
63+--------------------------------------+
64| [1, 2, 3, 4] |
65+--------------------------------------+
66```"#,
67 argument(
68 name = "array",
69 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
70 ),
71 argument(name = "element", description = "Element to append to the array.")
72)]
73#[derive(Debug, PartialEq, Eq, Hash)]
74pub struct ArrayAppend {
75 signature: Signature,
76 aliases: Vec<String>,
77}
78
79impl Default for ArrayAppend {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl ArrayAppend {
86 pub fn new() -> Self {
87 Self {
88 signature: Signature::array_and_element(Volatility::Immutable),
89 aliases: vec![
90 String::from("list_append"),
91 String::from("array_push_back"),
92 String::from("list_push_back"),
93 ],
94 }
95 }
96}
97
98impl ScalarUDFImpl for ArrayAppend {
99 fn as_any(&self) -> &dyn Any {
100 self
101 }
102
103 fn name(&self) -> &str {
104 "array_append"
105 }
106
107 fn signature(&self) -> &Signature {
108 &self.signature
109 }
110
111 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
112 let [array_type, element_type] = take_function_args(self.name(), arg_types)?;
113 if array_type.is_null() {
114 Ok(DataType::new_list(element_type.clone(), true))
115 } else {
116 Ok(array_type.clone())
117 }
118 }
119
120 fn invoke_with_args(
121 &self,
122 args: datafusion_expr::ScalarFunctionArgs,
123 ) -> Result<ColumnarValue> {
124 make_scalar_function(array_append_inner)(&args.args)
125 }
126
127 fn aliases(&self) -> &[String] {
128 &self.aliases
129 }
130
131 fn documentation(&self) -> Option<&Documentation> {
132 self.doc()
133 }
134}
135
136make_udf_expr_and_func!(
137 ArrayPrepend,
138 array_prepend,
139 element array,
140 "Prepends an element to the beginning of an array.",
141 array_prepend_udf
142);
143
144#[user_doc(
145 doc_section(label = "Array Functions"),
146 description = "Prepends an element to the beginning of an array.",
147 syntax_example = "array_prepend(element, array)",
148 sql_example = r#"```sql
149> select array_prepend(1, [2, 3, 4]);
150+---------------------------------------+
151| array_prepend(Int64(1),List([2,3,4])) |
152+---------------------------------------+
153| [1, 2, 3, 4] |
154+---------------------------------------+
155```"#,
156 argument(
157 name = "array",
158 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
159 ),
160 argument(name = "element", description = "Element to prepend to the array.")
161)]
162#[derive(Debug, PartialEq, Eq, Hash)]
163pub struct ArrayPrepend {
164 signature: Signature,
165 aliases: Vec<String>,
166}
167
168impl Default for ArrayPrepend {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174impl ArrayPrepend {
175 pub fn new() -> Self {
176 Self {
177 signature: Signature::element_and_array(Volatility::Immutable),
178 aliases: vec![
179 String::from("list_prepend"),
180 String::from("array_push_front"),
181 String::from("list_push_front"),
182 ],
183 }
184 }
185}
186
187impl ScalarUDFImpl for ArrayPrepend {
188 fn as_any(&self) -> &dyn Any {
189 self
190 }
191
192 fn name(&self) -> &str {
193 "array_prepend"
194 }
195
196 fn signature(&self) -> &Signature {
197 &self.signature
198 }
199
200 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
201 let [element_type, array_type] = take_function_args(self.name(), arg_types)?;
202 if array_type.is_null() {
203 Ok(DataType::new_list(element_type.clone(), true))
204 } else {
205 Ok(array_type.clone())
206 }
207 }
208
209 fn invoke_with_args(
210 &self,
211 args: datafusion_expr::ScalarFunctionArgs,
212 ) -> Result<ColumnarValue> {
213 make_scalar_function(array_prepend_inner)(&args.args)
214 }
215
216 fn aliases(&self) -> &[String] {
217 &self.aliases
218 }
219
220 fn documentation(&self) -> Option<&Documentation> {
221 self.doc()
222 }
223}
224
225make_udf_expr_and_func!(
226 ArrayConcat,
227 array_concat,
228 "Concatenates arrays.",
229 array_concat_udf
230);
231
232#[user_doc(
233 doc_section(label = "Array Functions"),
234 description = "Concatenates arrays.",
235 syntax_example = "array_concat(array[, ..., array_n])",
236 sql_example = r#"```sql
237> select array_concat([1, 2], [3, 4], [5, 6]);
238+---------------------------------------------------+
239| array_concat(List([1,2]),List([3,4]),List([5,6])) |
240+---------------------------------------------------+
241| [1, 2, 3, 4, 5, 6] |
242+---------------------------------------------------+
243```"#,
244 argument(
245 name = "array",
246 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
247 ),
248 argument(
249 name = "array_n",
250 description = "Subsequent array column or literal array to concatenate."
251 )
252)]
253#[derive(Debug, PartialEq, Eq, Hash)]
254pub struct ArrayConcat {
255 signature: Signature,
256 aliases: Vec<String>,
257}
258
259impl Default for ArrayConcat {
260 fn default() -> Self {
261 Self::new()
262 }
263}
264
265impl ArrayConcat {
266 pub fn new() -> Self {
267 Self {
268 signature: Signature::user_defined(Volatility::Immutable),
269 aliases: vec![
270 String::from("array_cat"),
271 String::from("list_concat"),
272 String::from("list_cat"),
273 ],
274 }
275 }
276}
277
278impl ScalarUDFImpl for ArrayConcat {
279 fn as_any(&self) -> &dyn Any {
280 self
281 }
282
283 fn name(&self) -> &str {
284 "array_concat"
285 }
286
287 fn signature(&self) -> &Signature {
288 &self.signature
289 }
290
291 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
292 let mut max_dims = 0;
293 let mut large_list = false;
294 let mut element_types = Vec::with_capacity(arg_types.len());
295 for arg_type in arg_types {
296 match arg_type {
297 DataType::Null | DataType::List(_) | DataType::FixedSizeList(..) => (),
298 DataType::LargeList(_) => large_list = true,
299 arg_type => {
300 return plan_err!("{} does not support type {arg_type}", self.name())
301 }
302 }
303
304 max_dims = max_dims.max(list_ndims(arg_type));
305 element_types.push(base_type(arg_type))
306 }
307
308 if max_dims == 0 {
309 Ok(DataType::Null)
310 } else if let Some(mut return_type) = type_union_resolution(&element_types) {
311 for _ in 1..max_dims {
312 return_type = DataType::new_list(return_type, true)
313 }
314
315 if large_list {
316 Ok(DataType::new_large_list(return_type, true))
317 } else {
318 Ok(DataType::new_list(return_type, true))
319 }
320 } else {
321 plan_err!(
322 "Failed to unify argument types of {}: [{}]",
323 self.name(),
324 arg_types.iter().join(", ")
325 )
326 }
327 }
328
329 fn invoke_with_args(
330 &self,
331 args: datafusion_expr::ScalarFunctionArgs,
332 ) -> Result<ColumnarValue> {
333 make_scalar_function(array_concat_inner)(&args.args)
334 }
335
336 fn aliases(&self) -> &[String] {
337 &self.aliases
338 }
339
340 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
341 let base_type = base_type(&self.return_type(arg_types)?);
342 let coercion = Some(&ListCoercion::FixedSizedListToList);
343 let arg_types = arg_types.iter().map(|arg_type| {
344 coerced_type_with_base_type_only(arg_type, &base_type, coercion)
345 });
346
347 Ok(arg_types.collect())
348 }
349
350 fn documentation(&self) -> Option<&Documentation> {
351 self.doc()
352 }
353}
354
355pub(crate) fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
357 if args.is_empty() {
358 return exec_err!("array_concat expects at least one argument");
359 }
360
361 let mut all_null = true;
362 let mut large_list = false;
363 for arg in args {
364 match arg.data_type() {
365 DataType::Null => continue,
366 DataType::LargeList(_) => large_list = true,
367 _ => (),
368 }
369 if arg.null_count() < arg.len() {
370 all_null = false;
371 }
372 }
373
374 if all_null {
375 let return_type = args
377 .iter()
378 .map(|arg| arg.data_type())
379 .find_or_first(|d| !d.is_null())
380 .unwrap(); Ok(arrow::array::make_array(ArrayData::new_null(
383 return_type,
384 args[0].len(),
385 )))
386 } else if large_list {
387 concat_internal::<i64>(args)
388 } else {
389 concat_internal::<i32>(args)
390 }
391}
392
393fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
394 let args = align_array_dimensions::<O>(args.to_vec())?;
395
396 let list_arrays = args
397 .iter()
398 .map(|arg| as_generic_list_array::<O>(arg))
399 .collect::<Result<Vec<_>>>()?;
400 let row_count = list_arrays[0].len();
402
403 let mut array_lengths = vec![];
404 let mut arrays = vec![];
405 let mut valid = NullBufferBuilder::new(row_count);
406 for i in 0..row_count {
407 let nulls = list_arrays
408 .iter()
409 .map(|arr| arr.is_null(i))
410 .collect::<Vec<_>>();
411
412 let is_null = nulls.iter().all(|&x| x);
414 if is_null {
415 array_lengths.push(0);
416 valid.append_null();
417 } else {
418 let values = list_arrays
420 .iter()
421 .map(|arr| arr.value(i))
422 .collect::<Vec<_>>();
423
424 let elements = values
425 .iter()
426 .map(|a| a.as_ref())
427 .collect::<Vec<&dyn Array>>();
428
429 let concatenated_array = arrow::compute::concat(elements.as_slice())?;
431 array_lengths.push(concatenated_array.len());
432 arrays.push(concatenated_array);
433 valid.append_non_null();
434 }
435 }
436 let data_type = list_arrays[0].value_type();
438
439 let elements = arrays
440 .iter()
441 .map(|a| a.as_ref())
442 .collect::<Vec<&dyn Array>>();
443
444 let list_arr = GenericListArray::<O>::new(
445 Arc::new(Field::new_list_field(data_type, true)),
446 OffsetBuffer::from_lengths(array_lengths),
447 Arc::new(arrow::compute::concat(elements.as_slice())?),
448 valid.finish(),
449 );
450
451 Ok(Arc::new(list_arr))
452}
453
454pub(crate) fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
458 let [array, values] = take_function_args("array_append", args)?;
459 match array.data_type() {
460 DataType::Null => make_array_inner(&[Arc::clone(values)]),
461 DataType::List(_) => general_append_and_prepend::<i32>(args, true),
462 DataType::LargeList(_) => general_append_and_prepend::<i64>(args, true),
463 arg_type => exec_err!("array_append does not support type {arg_type}"),
464 }
465}
466
467pub(crate) fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
469 let [values, array] = take_function_args("array_prepend", args)?;
470 match array.data_type() {
471 DataType::Null => make_array_inner(&[Arc::clone(values)]),
472 DataType::List(_) => general_append_and_prepend::<i32>(args, false),
473 DataType::LargeList(_) => general_append_and_prepend::<i64>(args, false),
474 arg_type => exec_err!("array_prepend does not support type {arg_type}"),
475 }
476}
477
478fn general_append_and_prepend<O: OffsetSizeTrait>(
479 args: &[ArrayRef],
480 is_append: bool,
481) -> Result<ArrayRef>
482where
483 i64: TryInto<O>,
484{
485 let (list_array, element_array) = if is_append {
486 let list_array = as_generic_list_array::<O>(&args[0])?;
487 let element_array = &args[1];
488 check_datatypes("array_append", &[element_array, list_array.values()])?;
489 (list_array, element_array)
490 } else {
491 let list_array = as_generic_list_array::<O>(&args[1])?;
492 let element_array = &args[0];
493 check_datatypes("array_prepend", &[list_array.values(), element_array])?;
494 (list_array, element_array)
495 };
496
497 let res = match list_array.value_type() {
498 DataType::List(_) => concat_internal::<O>(args)?,
499 DataType::LargeList(_) => concat_internal::<O>(args)?,
500 data_type => {
501 return generic_append_and_prepend::<O>(
502 list_array,
503 element_array,
504 &data_type,
505 is_append,
506 );
507 }
508 };
509
510 Ok(res)
511}
512
513fn generic_append_and_prepend<O: OffsetSizeTrait>(
533 list_array: &GenericListArray<O>,
534 element_array: &ArrayRef,
535 data_type: &DataType,
536 is_append: bool,
537) -> Result<ArrayRef>
538where
539 i64: TryInto<O>,
540{
541 let mut offsets = vec![O::usize_as(0)];
542 let values = list_array.values();
543 let original_data = values.to_data();
544 let element_data = element_array.to_data();
545 let capacity = Capacities::Array(original_data.len() + element_data.len());
546
547 let mut mutable = MutableArrayData::with_capacities(
548 vec![&original_data, &element_data],
549 false,
550 capacity,
551 );
552
553 let values_index = 0;
554 let element_index = 1;
555
556 for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
557 let start = offset_window[0].to_usize().unwrap();
558 let end = offset_window[1].to_usize().unwrap();
559 if is_append {
560 mutable.extend(values_index, start, end);
561 mutable.extend(element_index, row_index, row_index + 1);
562 } else {
563 mutable.extend(element_index, row_index, row_index + 1);
564 mutable.extend(values_index, start, end);
565 }
566 offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
567 }
568
569 let data = mutable.freeze();
570
571 Ok(Arc::new(GenericListArray::<O>::try_new(
572 Arc::new(Field::new_list_field(data_type.to_owned(), true)),
573 OffsetBuffer::new(offsets.into()),
574 arrow::array::make_array(data),
575 None,
576 )?))
577}