1use std::any::Any;
21use std::sync::Arc;
22
23use crate::make_array::make_array_inner;
24use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
25use arrow::array::{
26 Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
27 NullBufferBuilder, OffsetSizeTrait,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::{DataType, Field};
31use datafusion_common::utils::{
32 base_type, coerced_type_with_base_type_only, ListCoercion,
33};
34use datafusion_common::Result;
35use datafusion_common::{
36 cast::as_generic_list_array,
37 exec_err, plan_err,
38 utils::{list_ndims, take_function_args},
39};
40use datafusion_expr::binary::type_union_resolution;
41use datafusion_expr::{
42 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45use itertools::Itertools;
46
47make_udf_expr_and_func!(
48 ArrayAppend,
49 array_append,
50 array element, "appends an element to the end of an array.", array_append_udf );
54
55#[user_doc(
56 doc_section(label = "Array Functions"),
57 description = "Appends an element to the end of an array.",
58 syntax_example = "array_append(array, element)",
59 sql_example = r#"```sql
60> select array_append([1, 2, 3], 4);
61+--------------------------------------+
62| array_append(List([1,2,3]),Int64(4)) |
63+--------------------------------------+
64| [1, 2, 3, 4] |
65+--------------------------------------+
66```"#,
67 argument(
68 name = "array",
69 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
70 ),
71 argument(name = "element", description = "Element to append to the array.")
72)]
73#[derive(Debug)]
74pub struct ArrayAppend {
75 signature: Signature,
76 aliases: Vec<String>,
77}
78
79impl Default for ArrayAppend {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl ArrayAppend {
86 pub fn new() -> Self {
87 Self {
88 signature: Signature::array_and_element(Volatility::Immutable),
89 aliases: vec![
90 String::from("list_append"),
91 String::from("array_push_back"),
92 String::from("list_push_back"),
93 ],
94 }
95 }
96}
97
98impl ScalarUDFImpl for ArrayAppend {
99 fn as_any(&self) -> &dyn Any {
100 self
101 }
102
103 fn name(&self) -> &str {
104 "array_append"
105 }
106
107 fn signature(&self) -> &Signature {
108 &self.signature
109 }
110
111 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
112 let [array_type, element_type] = take_function_args(self.name(), arg_types)?;
113 if array_type.is_null() {
114 Ok(DataType::new_list(element_type.clone(), true))
115 } else {
116 Ok(array_type.clone())
117 }
118 }
119
120 fn invoke_with_args(
121 &self,
122 args: datafusion_expr::ScalarFunctionArgs,
123 ) -> Result<ColumnarValue> {
124 make_scalar_function(array_append_inner)(&args.args)
125 }
126
127 fn aliases(&self) -> &[String] {
128 &self.aliases
129 }
130
131 fn documentation(&self) -> Option<&Documentation> {
132 self.doc()
133 }
134}
135
136make_udf_expr_and_func!(
137 ArrayPrepend,
138 array_prepend,
139 element array,
140 "Prepends an element to the beginning of an array.",
141 array_prepend_udf
142);
143
144#[user_doc(
145 doc_section(label = "Array Functions"),
146 description = "Prepends an element to the beginning of an array.",
147 syntax_example = "array_prepend(element, array)",
148 sql_example = r#"```sql
149> select array_prepend(1, [2, 3, 4]);
150+---------------------------------------+
151| array_prepend(Int64(1),List([2,3,4])) |
152+---------------------------------------+
153| [1, 2, 3, 4] |
154+---------------------------------------+
155```"#,
156 argument(
157 name = "array",
158 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
159 ),
160 argument(name = "element", description = "Element to prepend to the array.")
161)]
162#[derive(Debug)]
163pub struct ArrayPrepend {
164 signature: Signature,
165 aliases: Vec<String>,
166}
167
168impl Default for ArrayPrepend {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174impl ArrayPrepend {
175 pub fn new() -> Self {
176 Self {
177 signature: Signature::element_and_array(Volatility::Immutable),
178 aliases: vec![
179 String::from("list_prepend"),
180 String::from("array_push_front"),
181 String::from("list_push_front"),
182 ],
183 }
184 }
185}
186
187impl ScalarUDFImpl for ArrayPrepend {
188 fn as_any(&self) -> &dyn Any {
189 self
190 }
191
192 fn name(&self) -> &str {
193 "array_prepend"
194 }
195
196 fn signature(&self) -> &Signature {
197 &self.signature
198 }
199
200 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
201 let [element_type, array_type] = take_function_args(self.name(), arg_types)?;
202 if array_type.is_null() {
203 Ok(DataType::new_list(element_type.clone(), true))
204 } else {
205 Ok(array_type.clone())
206 }
207 }
208
209 fn invoke_with_args(
210 &self,
211 args: datafusion_expr::ScalarFunctionArgs,
212 ) -> Result<ColumnarValue> {
213 make_scalar_function(array_prepend_inner)(&args.args)
214 }
215
216 fn aliases(&self) -> &[String] {
217 &self.aliases
218 }
219
220 fn documentation(&self) -> Option<&Documentation> {
221 self.doc()
222 }
223}
224
225make_udf_expr_and_func!(
226 ArrayConcat,
227 array_concat,
228 "Concatenates arrays.",
229 array_concat_udf
230);
231
232#[user_doc(
233 doc_section(label = "Array Functions"),
234 description = "Concatenates arrays.",
235 syntax_example = "array_concat(array[, ..., array_n])",
236 sql_example = r#"```sql
237> select array_concat([1, 2], [3, 4], [5, 6]);
238+---------------------------------------------------+
239| array_concat(List([1,2]),List([3,4]),List([5,6])) |
240+---------------------------------------------------+
241| [1, 2, 3, 4, 5, 6] |
242+---------------------------------------------------+
243```"#,
244 argument(
245 name = "array",
246 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
247 ),
248 argument(
249 name = "array_n",
250 description = "Subsequent array column or literal array to concatenate."
251 )
252)]
253#[derive(Debug)]
254pub struct ArrayConcat {
255 signature: Signature,
256 aliases: Vec<String>,
257}
258
259impl Default for ArrayConcat {
260 fn default() -> Self {
261 Self::new()
262 }
263}
264
265impl ArrayConcat {
266 pub fn new() -> Self {
267 Self {
268 signature: Signature::user_defined(Volatility::Immutable),
269 aliases: vec![
270 String::from("array_cat"),
271 String::from("list_concat"),
272 String::from("list_cat"),
273 ],
274 }
275 }
276}
277
278impl ScalarUDFImpl for ArrayConcat {
279 fn as_any(&self) -> &dyn Any {
280 self
281 }
282
283 fn name(&self) -> &str {
284 "array_concat"
285 }
286
287 fn signature(&self) -> &Signature {
288 &self.signature
289 }
290
291 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
292 let mut max_dims = 0;
293 let mut large_list = false;
294 let mut element_types = Vec::with_capacity(arg_types.len());
295 for arg_type in arg_types {
296 match arg_type {
297 DataType::Null | DataType::List(_) | DataType::FixedSizeList(..) => (),
298 DataType::LargeList(_) => large_list = true,
299 arg_type => {
300 return plan_err!("{} does not support type {arg_type}", self.name())
301 }
302 }
303
304 max_dims = max_dims.max(list_ndims(arg_type));
305 element_types.push(base_type(arg_type))
306 }
307
308 if max_dims == 0 {
309 Ok(DataType::Null)
310 } else if let Some(mut return_type) = type_union_resolution(&element_types) {
311 for _ in 1..max_dims {
312 return_type = DataType::new_list(return_type, true)
313 }
314
315 if large_list {
316 Ok(DataType::new_large_list(return_type, true))
317 } else {
318 Ok(DataType::new_list(return_type, true))
319 }
320 } else {
321 plan_err!(
322 "Failed to unify argument types of {}: {arg_types:?}",
323 self.name()
324 )
325 }
326 }
327
328 fn invoke_with_args(
329 &self,
330 args: datafusion_expr::ScalarFunctionArgs,
331 ) -> Result<ColumnarValue> {
332 make_scalar_function(array_concat_inner)(&args.args)
333 }
334
335 fn aliases(&self) -> &[String] {
336 &self.aliases
337 }
338
339 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
340 let base_type = base_type(&self.return_type(arg_types)?);
341 let coercion = Some(&ListCoercion::FixedSizedListToList);
342 let arg_types = arg_types.iter().map(|arg_type| {
343 coerced_type_with_base_type_only(arg_type, &base_type, coercion)
344 });
345
346 Ok(arg_types.collect())
347 }
348
349 fn documentation(&self) -> Option<&Documentation> {
350 self.doc()
351 }
352}
353
354pub(crate) fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
356 if args.is_empty() {
357 return exec_err!("array_concat expects at least one argument");
358 }
359
360 let mut all_null = true;
361 let mut large_list = false;
362 for arg in args {
363 match arg.data_type() {
364 DataType::Null => continue,
365 DataType::LargeList(_) => large_list = true,
366 _ => (),
367 }
368 if arg.null_count() < arg.len() {
369 all_null = false;
370 }
371 }
372
373 if all_null {
374 let return_type = args
376 .iter()
377 .map(|arg| arg.data_type())
378 .find_or_first(|d| !d.is_null())
379 .unwrap(); Ok(arrow::array::make_array(ArrayData::new_null(
382 return_type,
383 args[0].len(),
384 )))
385 } else if large_list {
386 concat_internal::<i64>(args)
387 } else {
388 concat_internal::<i32>(args)
389 }
390}
391
392fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
393 let args = align_array_dimensions::<O>(args.to_vec())?;
394
395 let list_arrays = args
396 .iter()
397 .map(|arg| as_generic_list_array::<O>(arg))
398 .collect::<Result<Vec<_>>>()?;
399 let row_count = list_arrays[0].len();
401
402 let mut array_lengths = vec![];
403 let mut arrays = vec![];
404 let mut valid = NullBufferBuilder::new(row_count);
405 for i in 0..row_count {
406 let nulls = list_arrays
407 .iter()
408 .map(|arr| arr.is_null(i))
409 .collect::<Vec<_>>();
410
411 let is_null = nulls.iter().all(|&x| x);
413 if is_null {
414 array_lengths.push(0);
415 valid.append_null();
416 } else {
417 let values = list_arrays
419 .iter()
420 .map(|arr| arr.value(i))
421 .collect::<Vec<_>>();
422
423 let elements = values
424 .iter()
425 .map(|a| a.as_ref())
426 .collect::<Vec<&dyn Array>>();
427
428 let concatenated_array = arrow::compute::concat(elements.as_slice())?;
430 array_lengths.push(concatenated_array.len());
431 arrays.push(concatenated_array);
432 valid.append_non_null();
433 }
434 }
435 let data_type = list_arrays[0].value_type();
437
438 let elements = arrays
439 .iter()
440 .map(|a| a.as_ref())
441 .collect::<Vec<&dyn Array>>();
442
443 let list_arr = GenericListArray::<O>::new(
444 Arc::new(Field::new_list_field(data_type, true)),
445 OffsetBuffer::from_lengths(array_lengths),
446 Arc::new(arrow::compute::concat(elements.as_slice())?),
447 valid.finish(),
448 );
449
450 Ok(Arc::new(list_arr))
451}
452
453pub(crate) fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
457 let [array, values] = take_function_args("array_append", args)?;
458 match array.data_type() {
459 DataType::Null => make_array_inner(&[Arc::clone(values)]),
460 DataType::List(_) => general_append_and_prepend::<i32>(args, true),
461 DataType::LargeList(_) => general_append_and_prepend::<i64>(args, true),
462 arg_type => exec_err!("array_append does not support type {arg_type}"),
463 }
464}
465
466pub(crate) fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
468 let [values, array] = take_function_args("array_prepend", args)?;
469 match array.data_type() {
470 DataType::Null => make_array_inner(&[Arc::clone(values)]),
471 DataType::List(_) => general_append_and_prepend::<i32>(args, false),
472 DataType::LargeList(_) => general_append_and_prepend::<i64>(args, false),
473 arg_type => exec_err!("array_prepend does not support type {arg_type}"),
474 }
475}
476
477fn general_append_and_prepend<O: OffsetSizeTrait>(
478 args: &[ArrayRef],
479 is_append: bool,
480) -> Result<ArrayRef>
481where
482 i64: TryInto<O>,
483{
484 let (list_array, element_array) = if is_append {
485 let list_array = as_generic_list_array::<O>(&args[0])?;
486 let element_array = &args[1];
487 check_datatypes("array_append", &[element_array, list_array.values()])?;
488 (list_array, element_array)
489 } else {
490 let list_array = as_generic_list_array::<O>(&args[1])?;
491 let element_array = &args[0];
492 check_datatypes("array_prepend", &[list_array.values(), element_array])?;
493 (list_array, element_array)
494 };
495
496 let res = match list_array.value_type() {
497 DataType::List(_) => concat_internal::<O>(args)?,
498 DataType::LargeList(_) => concat_internal::<O>(args)?,
499 data_type => {
500 return generic_append_and_prepend::<O>(
501 list_array,
502 element_array,
503 &data_type,
504 is_append,
505 );
506 }
507 };
508
509 Ok(res)
510}
511
512fn generic_append_and_prepend<O: OffsetSizeTrait>(
532 list_array: &GenericListArray<O>,
533 element_array: &ArrayRef,
534 data_type: &DataType,
535 is_append: bool,
536) -> Result<ArrayRef>
537where
538 i64: TryInto<O>,
539{
540 let mut offsets = vec![O::usize_as(0)];
541 let values = list_array.values();
542 let original_data = values.to_data();
543 let element_data = element_array.to_data();
544 let capacity = Capacities::Array(original_data.len() + element_data.len());
545
546 let mut mutable = MutableArrayData::with_capacities(
547 vec![&original_data, &element_data],
548 false,
549 capacity,
550 );
551
552 let values_index = 0;
553 let element_index = 1;
554
555 for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
556 let start = offset_window[0].to_usize().unwrap();
557 let end = offset_window[1].to_usize().unwrap();
558 if is_append {
559 mutable.extend(values_index, start, end);
560 mutable.extend(element_index, row_index, row_index + 1);
561 } else {
562 mutable.extend(element_index, row_index, row_index + 1);
563 mutable.extend(values_index, start, end);
564 }
565 offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
566 }
567
568 let data = mutable.freeze();
569
570 Ok(Arc::new(GenericListArray::<O>::try_new(
571 Arc::new(Field::new_list_field(data_type.to_owned(), true)),
572 OffsetBuffer::new(offsets.into()),
573 arrow::array::make_array(data),
574 None,
575 )?))
576}