1use std::sync::Arc;
21
22use crate::make_array::make_array_inner;
23use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
24use arrow::array::{
25 Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
26 OffsetSizeTrait,
27};
28use arrow::buffer::{NullBuffer, OffsetBuffer};
29use arrow::datatypes::{DataType, Field};
30use datafusion_common::Result;
31use datafusion_common::utils::{
32 ListCoercion, base_type, coerced_type_with_base_type_only,
33};
34use datafusion_common::{
35 cast::as_generic_list_array,
36 exec_err, plan_err,
37 utils::{list_ndims, take_function_args},
38};
39use datafusion_expr::binary::type_union_resolution;
40use datafusion_expr::{
41 ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
42 Volatility,
43};
44use datafusion_macros::user_doc;
45use itertools::Itertools;
46
47make_udf_expr_and_func!(
48 ArrayAppend,
49 array_append,
50 array element, "appends an element to the end of an array.", array_append_udf );
54
55#[user_doc(
56 doc_section(label = "Array Functions"),
57 description = "Appends an element to the end of an array.",
58 syntax_example = "array_append(array, element)",
59 sql_example = r#"```sql
60> select array_append([1, 2, 3], 4);
61+--------------------------------------+
62| array_append(List([1,2,3]),Int64(4)) |
63+--------------------------------------+
64| [1, 2, 3, 4] |
65+--------------------------------------+
66```"#,
67 argument(
68 name = "array",
69 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
70 ),
71 argument(name = "element", description = "Element to append to the array.")
72)]
73#[derive(Debug, PartialEq, Eq, Hash)]
74pub struct ArrayAppend {
75 signature: Signature,
76 aliases: Vec<String>,
77}
78
79impl Default for ArrayAppend {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl ArrayAppend {
86 pub fn new() -> Self {
87 Self {
88 signature: Signature::array_and_element(Volatility::Immutable),
89 aliases: vec![
90 String::from("list_append"),
91 String::from("array_push_back"),
92 String::from("list_push_back"),
93 ],
94 }
95 }
96}
97
98impl ScalarUDFImpl for ArrayAppend {
99 fn name(&self) -> &str {
100 "array_append"
101 }
102
103 fn signature(&self) -> &Signature {
104 &self.signature
105 }
106
107 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
108 let [array_type, element_type] = take_function_args(self.name(), arg_types)?;
109 if array_type.is_null() {
110 Ok(DataType::new_list(element_type.clone(), true))
111 } else {
112 Ok(array_type.clone())
113 }
114 }
115
116 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
117 make_scalar_function(array_append_inner)(&args.args)
118 }
119
120 fn aliases(&self) -> &[String] {
121 &self.aliases
122 }
123
124 fn documentation(&self) -> Option<&Documentation> {
125 self.doc()
126 }
127}
128
129make_udf_expr_and_func!(
130 ArrayPrepend,
131 array_prepend,
132 element array,
133 "Prepends an element to the beginning of an array.",
134 array_prepend_udf
135);
136
137#[user_doc(
138 doc_section(label = "Array Functions"),
139 description = "Prepends an element to the beginning of an array.",
140 syntax_example = "array_prepend(element, array)",
141 sql_example = r#"```sql
142> select array_prepend(1, [2, 3, 4]);
143+---------------------------------------+
144| array_prepend(Int64(1),List([2,3,4])) |
145+---------------------------------------+
146| [1, 2, 3, 4] |
147+---------------------------------------+
148```"#,
149 argument(
150 name = "array",
151 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
152 ),
153 argument(name = "element", description = "Element to prepend to the array.")
154)]
155#[derive(Debug, PartialEq, Eq, Hash)]
156pub struct ArrayPrepend {
157 signature: Signature,
158 aliases: Vec<String>,
159}
160
161impl Default for ArrayPrepend {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167impl ArrayPrepend {
168 pub fn new() -> Self {
169 Self {
170 signature: Signature::element_and_array(Volatility::Immutable),
171 aliases: vec![
172 String::from("list_prepend"),
173 String::from("array_push_front"),
174 String::from("list_push_front"),
175 ],
176 }
177 }
178}
179
180impl ScalarUDFImpl for ArrayPrepend {
181 fn name(&self) -> &str {
182 "array_prepend"
183 }
184
185 fn signature(&self) -> &Signature {
186 &self.signature
187 }
188
189 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
190 let [element_type, array_type] = take_function_args(self.name(), arg_types)?;
191 if array_type.is_null() {
192 Ok(DataType::new_list(element_type.clone(), true))
193 } else {
194 Ok(array_type.clone())
195 }
196 }
197
198 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
199 make_scalar_function(array_prepend_inner)(&args.args)
200 }
201
202 fn aliases(&self) -> &[String] {
203 &self.aliases
204 }
205
206 fn documentation(&self) -> Option<&Documentation> {
207 self.doc()
208 }
209}
210
211make_udf_expr_and_func!(
212 ArrayConcat,
213 array_concat,
214 "Concatenates arrays.",
215 array_concat_udf
216);
217
218#[user_doc(
219 doc_section(label = "Array Functions"),
220 description = "Concatenates arrays.",
221 syntax_example = "array_concat(array[, ..., array_n])",
222 sql_example = r#"```sql
223> select array_concat([1, 2], [3, 4], [5, 6]);
224+---------------------------------------------------+
225| array_concat(List([1,2]),List([3,4]),List([5,6])) |
226+---------------------------------------------------+
227| [1, 2, 3, 4, 5, 6] |
228+---------------------------------------------------+
229```"#,
230 argument(
231 name = "array",
232 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
233 ),
234 argument(
235 name = "array_n",
236 description = "Subsequent array column or literal array to concatenate."
237 )
238)]
239#[derive(Debug, PartialEq, Eq, Hash)]
240pub struct ArrayConcat {
241 signature: Signature,
242 aliases: Vec<String>,
243}
244
245impl Default for ArrayConcat {
246 fn default() -> Self {
247 Self::new()
248 }
249}
250
251impl ArrayConcat {
252 pub fn new() -> Self {
253 Self {
254 signature: Signature::user_defined(Volatility::Immutable),
255 aliases: vec![
256 String::from("array_cat"),
257 String::from("list_concat"),
258 String::from("list_cat"),
259 ],
260 }
261 }
262}
263
264impl ScalarUDFImpl for ArrayConcat {
265 fn name(&self) -> &str {
266 "array_concat"
267 }
268
269 fn signature(&self) -> &Signature {
270 &self.signature
271 }
272
273 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
274 let mut max_dims = 0;
275 let mut large_list = false;
276 let mut element_types = Vec::with_capacity(arg_types.len());
277 for arg_type in arg_types {
278 match arg_type {
279 DataType::Null | DataType::List(_) | DataType::FixedSizeList(..) => (),
280 DataType::LargeList(_) => large_list = true,
281 arg_type => {
282 return plan_err!("{} does not support type {arg_type}", self.name());
283 }
284 }
285
286 max_dims = max_dims.max(list_ndims(arg_type));
287 element_types.push(base_type(arg_type))
288 }
289
290 if max_dims == 0 {
291 Ok(DataType::Null)
292 } else if let Some(mut return_type) = type_union_resolution(&element_types) {
293 for _ in 1..max_dims {
294 return_type = DataType::new_list(return_type, true)
295 }
296
297 if large_list {
298 Ok(DataType::new_large_list(return_type, true))
299 } else {
300 Ok(DataType::new_list(return_type, true))
301 }
302 } else {
303 plan_err!(
304 "Failed to unify argument types of {}: [{}]",
305 self.name(),
306 arg_types.iter().join(", ")
307 )
308 }
309 }
310
311 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
312 make_scalar_function(array_concat_inner)(&args.args)
313 }
314
315 fn aliases(&self) -> &[String] {
316 &self.aliases
317 }
318
319 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
320 let return_type = self.return_type(arg_types)?;
321 let base_type = base_type(&return_type);
322 let coercion = Some(&ListCoercion::FixedSizedListToList);
323 let promote_to_large_list = matches!(return_type, DataType::LargeList(_));
328 let arg_types = arg_types.iter().map(|arg_type| {
329 let coerced =
330 coerced_type_with_base_type_only(arg_type, &base_type, coercion);
331 match coerced {
332 DataType::List(field) if promote_to_large_list => {
333 DataType::LargeList(field)
334 }
335 other => other,
336 }
337 });
338
339 Ok(arg_types.collect())
340 }
341
342 fn documentation(&self) -> Option<&Documentation> {
343 self.doc()
344 }
345}
346
347pub fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
348 if args.is_empty() {
349 return exec_err!("array_concat expects at least one argument");
350 }
351
352 let mut all_null = true;
353 let mut large_list = false;
354 for arg in args {
355 match arg.data_type() {
356 DataType::Null => continue,
357 DataType::LargeList(_) => large_list = true,
358 _ => (),
359 }
360 if arg.null_count() < arg.len() {
361 all_null = false;
362 }
363 }
364
365 if all_null {
366 let return_type = args
368 .iter()
369 .map(|arg| arg.data_type())
370 .find_or_first(|d| !d.is_null())
371 .unwrap(); Ok(arrow::array::make_array(ArrayData::new_null(
374 return_type,
375 args[0].len(),
376 )))
377 } else if large_list {
378 concat_internal::<i64>(args)
379 } else {
380 concat_internal::<i32>(args)
381 }
382}
383
384fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
385 let args = align_array_dimensions::<O>(args.to_vec())?;
386
387 let list_arrays = args
388 .iter()
389 .map(|arg| as_generic_list_array::<O>(arg))
390 .collect::<Result<Vec<_>>>()?;
391 let row_count = list_arrays[0].len();
392
393 let values_data: Vec<ArrayData> =
395 list_arrays.iter().map(|la| la.values().to_data()).collect();
396 let values_data_refs: Vec<&ArrayData> = values_data.iter().collect();
397
398 let total_capacity: usize = values_data.iter().map(|d| d.len()).sum();
400
401 let mut mutable = MutableArrayData::with_capacities(
402 values_data_refs,
403 false,
404 Capacities::Array(total_capacity),
405 );
406 let mut offsets: Vec<O> = Vec::with_capacity(row_count + 1);
407 offsets.push(O::zero());
408
409 let nulls = list_arrays
414 .iter()
415 .filter_map(|la| la.nulls())
416 .collect::<Vec<_>>();
417 let valid = if nulls.len() == list_arrays.len() {
418 nulls
419 .iter()
420 .map(|n| n.inner().clone())
421 .reduce(|a, b| &a | &b)
422 .map(NullBuffer::new)
423 } else {
424 None
425 };
426
427 for row_idx in 0..row_count {
428 for (arr_idx, list_array) in list_arrays.iter().enumerate() {
429 if list_array.is_null(row_idx) {
430 continue;
431 }
432 let start = list_array.offsets()[row_idx].to_usize().unwrap();
433 let end = list_array.offsets()[row_idx + 1].to_usize().unwrap();
434 if start < end {
435 mutable.extend(arr_idx, start, end);
436 }
437 }
438 offsets.push(O::usize_as(mutable.len()));
439 }
440
441 let data_type = list_arrays[0].value_type();
442 let data = mutable.freeze();
443
444 Ok(Arc::new(GenericListArray::<O>::try_new(
445 Arc::new(Field::new_list_field(data_type, true)),
446 OffsetBuffer::new(offsets.into()),
447 arrow::array::make_array(data),
448 valid,
449 )?))
450}
451
452fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
455 let [array, values] = take_function_args("array_append", args)?;
456 match array.data_type() {
457 DataType::Null => make_array_inner(&[Arc::clone(values)]),
458 DataType::List(_) => general_append_and_prepend::<i32>(args, true),
459 DataType::LargeList(_) => general_append_and_prepend::<i64>(args, true),
460 arg_type => exec_err!("array_append does not support type {arg_type}"),
461 }
462}
463
464fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
465 let [values, array] = take_function_args("array_prepend", args)?;
466 match array.data_type() {
467 DataType::Null => make_array_inner(&[Arc::clone(values)]),
468 DataType::List(_) => general_append_and_prepend::<i32>(args, false),
469 DataType::LargeList(_) => general_append_and_prepend::<i64>(args, false),
470 arg_type => exec_err!("array_prepend does not support type {arg_type}"),
471 }
472}
473
474fn general_append_and_prepend<O: OffsetSizeTrait>(
475 args: &[ArrayRef],
476 is_append: bool,
477) -> Result<ArrayRef>
478where
479 i64: TryInto<O>,
480{
481 let (list_array, element_array) = if is_append {
482 let list_array = as_generic_list_array::<O>(&args[0])?;
483 let element_array = &args[1];
484 check_datatypes("array_append", &[element_array, list_array.values()])?;
485 (list_array, element_array)
486 } else {
487 let list_array = as_generic_list_array::<O>(&args[1])?;
488 let element_array = &args[0];
489 check_datatypes("array_prepend", &[list_array.values(), element_array])?;
490 (list_array, element_array)
491 };
492
493 let res = match list_array.value_type() {
494 DataType::List(_) => concat_internal::<O>(args)?,
495 DataType::LargeList(_) => concat_internal::<O>(args)?,
496 data_type => {
497 return generic_append_and_prepend::<O>(
498 list_array,
499 element_array,
500 &data_type,
501 is_append,
502 );
503 }
504 };
505
506 Ok(res)
507}
508
509fn generic_append_and_prepend<O: OffsetSizeTrait>(
529 list_array: &GenericListArray<O>,
530 element_array: &ArrayRef,
531 data_type: &DataType,
532 is_append: bool,
533) -> Result<ArrayRef>
534where
535 i64: TryInto<O>,
536{
537 let mut offsets = vec![O::usize_as(0)];
538 let values = list_array.values();
539 let original_data = values.to_data();
540 let element_data = element_array.to_data();
541 let capacity = Capacities::Array(original_data.len() + element_data.len());
542
543 let mut mutable = MutableArrayData::with_capacities(
544 vec![&original_data, &element_data],
545 false,
546 capacity,
547 );
548
549 let values_index = 0;
550 let element_index = 1;
551
552 for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
553 let start = offset_window[0].to_usize().unwrap();
554 let end = offset_window[1].to_usize().unwrap();
555 if is_append {
556 mutable.extend(values_index, start, end);
557 mutable.extend(element_index, row_index, row_index + 1);
558 } else {
559 mutable.extend(element_index, row_index, row_index + 1);
560 mutable.extend(values_index, start, end);
561 }
562 offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
563 }
564
565 let data = mutable.freeze();
566
567 Ok(Arc::new(GenericListArray::<O>::try_new(
568 Arc::new(Field::new_list_field(data_type.to_owned(), true)),
569 OffsetBuffer::new(offsets.into()),
570 arrow::array::make_array(data),
571 None,
572 )?))
573}