1use std::any::Any;
21use std::sync::Arc;
22
23use crate::make_array::make_array_inner;
24use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
25use arrow::array::{
26 Array, ArrayRef, Capacities, GenericListArray, MutableArrayData, NullArray,
27 NullBufferBuilder, OffsetSizeTrait,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::{DataType, Field};
31use datafusion_common::utils::{
32 base_type, coerced_type_with_base_type_only, ListCoercion,
33};
34use datafusion_common::Result;
35use datafusion_common::{
36 cast::as_generic_list_array,
37 exec_err, plan_err,
38 utils::{list_ndims, take_function_args},
39};
40use datafusion_expr::binary::type_union_resolution;
41use datafusion_expr::{
42 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45
46make_udf_expr_and_func!(
47 ArrayAppend,
48 array_append,
49 array element, "appends an element to the end of an array.", array_append_udf );
53
54#[user_doc(
55 doc_section(label = "Array Functions"),
56 description = "Appends an element to the end of an array.",
57 syntax_example = "array_append(array, element)",
58 sql_example = r#"```sql
59> select array_append([1, 2, 3], 4);
60+--------------------------------------+
61| array_append(List([1,2,3]),Int64(4)) |
62+--------------------------------------+
63| [1, 2, 3, 4] |
64+--------------------------------------+
65```"#,
66 argument(
67 name = "array",
68 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
69 ),
70 argument(name = "element", description = "Element to append to the array.")
71)]
72#[derive(Debug)]
73pub struct ArrayAppend {
74 signature: Signature,
75 aliases: Vec<String>,
76}
77
78impl Default for ArrayAppend {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl ArrayAppend {
85 pub fn new() -> Self {
86 Self {
87 signature: Signature::array_and_element(Volatility::Immutable),
88 aliases: vec![
89 String::from("list_append"),
90 String::from("array_push_back"),
91 String::from("list_push_back"),
92 ],
93 }
94 }
95}
96
97impl ScalarUDFImpl for ArrayAppend {
98 fn as_any(&self) -> &dyn Any {
99 self
100 }
101
102 fn name(&self) -> &str {
103 "array_append"
104 }
105
106 fn signature(&self) -> &Signature {
107 &self.signature
108 }
109
110 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
111 let [array_type, element_type] = take_function_args(self.name(), arg_types)?;
112 if array_type.is_null() {
113 Ok(DataType::new_list(element_type.clone(), true))
114 } else {
115 Ok(array_type.clone())
116 }
117 }
118
119 fn invoke_with_args(
120 &self,
121 args: datafusion_expr::ScalarFunctionArgs,
122 ) -> Result<ColumnarValue> {
123 make_scalar_function(array_append_inner)(&args.args)
124 }
125
126 fn aliases(&self) -> &[String] {
127 &self.aliases
128 }
129
130 fn documentation(&self) -> Option<&Documentation> {
131 self.doc()
132 }
133}
134
135make_udf_expr_and_func!(
136 ArrayPrepend,
137 array_prepend,
138 element array,
139 "Prepends an element to the beginning of an array.",
140 array_prepend_udf
141);
142
143#[user_doc(
144 doc_section(label = "Array Functions"),
145 description = "Prepends an element to the beginning of an array.",
146 syntax_example = "array_prepend(element, array)",
147 sql_example = r#"```sql
148> select array_prepend(1, [2, 3, 4]);
149+---------------------------------------+
150| array_prepend(Int64(1),List([2,3,4])) |
151+---------------------------------------+
152| [1, 2, 3, 4] |
153+---------------------------------------+
154```"#,
155 argument(
156 name = "array",
157 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
158 ),
159 argument(name = "element", description = "Element to prepend to the array.")
160)]
161#[derive(Debug)]
162pub struct ArrayPrepend {
163 signature: Signature,
164 aliases: Vec<String>,
165}
166
167impl Default for ArrayPrepend {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173impl ArrayPrepend {
174 pub fn new() -> Self {
175 Self {
176 signature: Signature::element_and_array(Volatility::Immutable),
177 aliases: vec![
178 String::from("list_prepend"),
179 String::from("array_push_front"),
180 String::from("list_push_front"),
181 ],
182 }
183 }
184}
185
186impl ScalarUDFImpl for ArrayPrepend {
187 fn as_any(&self) -> &dyn Any {
188 self
189 }
190
191 fn name(&self) -> &str {
192 "array_prepend"
193 }
194
195 fn signature(&self) -> &Signature {
196 &self.signature
197 }
198
199 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
200 let [element_type, array_type] = take_function_args(self.name(), arg_types)?;
201 if array_type.is_null() {
202 Ok(DataType::new_list(element_type.clone(), true))
203 } else {
204 Ok(array_type.clone())
205 }
206 }
207
208 fn invoke_with_args(
209 &self,
210 args: datafusion_expr::ScalarFunctionArgs,
211 ) -> Result<ColumnarValue> {
212 make_scalar_function(array_prepend_inner)(&args.args)
213 }
214
215 fn aliases(&self) -> &[String] {
216 &self.aliases
217 }
218
219 fn documentation(&self) -> Option<&Documentation> {
220 self.doc()
221 }
222}
223
224make_udf_expr_and_func!(
225 ArrayConcat,
226 array_concat,
227 "Concatenates arrays.",
228 array_concat_udf
229);
230
231#[user_doc(
232 doc_section(label = "Array Functions"),
233 description = "Concatenates arrays.",
234 syntax_example = "array_concat(array[, ..., array_n])",
235 sql_example = r#"```sql
236> select array_concat([1, 2], [3, 4], [5, 6]);
237+---------------------------------------------------+
238| array_concat(List([1,2]),List([3,4]),List([5,6])) |
239+---------------------------------------------------+
240| [1, 2, 3, 4, 5, 6] |
241+---------------------------------------------------+
242```"#,
243 argument(
244 name = "array",
245 description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
246 ),
247 argument(
248 name = "array_n",
249 description = "Subsequent array column or literal array to concatenate."
250 )
251)]
252#[derive(Debug)]
253pub struct ArrayConcat {
254 signature: Signature,
255 aliases: Vec<String>,
256}
257
258impl Default for ArrayConcat {
259 fn default() -> Self {
260 Self::new()
261 }
262}
263
264impl ArrayConcat {
265 pub fn new() -> Self {
266 Self {
267 signature: Signature::user_defined(Volatility::Immutable),
268 aliases: vec![
269 String::from("array_cat"),
270 String::from("list_concat"),
271 String::from("list_cat"),
272 ],
273 }
274 }
275}
276
277impl ScalarUDFImpl for ArrayConcat {
278 fn as_any(&self) -> &dyn Any {
279 self
280 }
281
282 fn name(&self) -> &str {
283 "array_concat"
284 }
285
286 fn signature(&self) -> &Signature {
287 &self.signature
288 }
289
290 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
291 let mut max_dims = 0;
292 let mut large_list = false;
293 let mut element_types = Vec::with_capacity(arg_types.len());
294 for arg_type in arg_types {
295 match arg_type {
296 DataType::Null | DataType::List(_) | DataType::FixedSizeList(..) => (),
297 DataType::LargeList(_) => large_list = true,
298 arg_type => {
299 return plan_err!("{} does not support type {arg_type}", self.name())
300 }
301 }
302
303 max_dims = max_dims.max(list_ndims(arg_type));
304 element_types.push(base_type(arg_type))
305 }
306
307 if max_dims == 0 {
308 Ok(DataType::Null)
309 } else if let Some(mut return_type) = type_union_resolution(&element_types) {
310 for _ in 1..max_dims {
311 return_type = DataType::new_list(return_type, true)
312 }
313
314 if large_list {
315 Ok(DataType::new_large_list(return_type, true))
316 } else {
317 Ok(DataType::new_list(return_type, true))
318 }
319 } else {
320 plan_err!(
321 "Failed to unify argument types of {}: {arg_types:?}",
322 self.name()
323 )
324 }
325 }
326
327 fn invoke_with_args(
328 &self,
329 args: datafusion_expr::ScalarFunctionArgs,
330 ) -> Result<ColumnarValue> {
331 make_scalar_function(array_concat_inner)(&args.args)
332 }
333
334 fn aliases(&self) -> &[String] {
335 &self.aliases
336 }
337
338 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
339 let base_type = base_type(&self.return_type(arg_types)?);
340 let coercion = Some(&ListCoercion::FixedSizedListToList);
341 let arg_types = arg_types.iter().map(|arg_type| {
342 coerced_type_with_base_type_only(arg_type, &base_type, coercion)
343 });
344
345 Ok(arg_types.collect())
346 }
347
348 fn documentation(&self) -> Option<&Documentation> {
349 self.doc()
350 }
351}
352
353pub(crate) fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
355 if args.is_empty() {
356 return exec_err!("array_concat expects at least one argument");
357 }
358
359 let mut all_null = true;
360 let mut large_list = false;
361 for arg in args {
362 match arg.data_type() {
363 DataType::Null => continue,
364 DataType::LargeList(_) => large_list = true,
365 _ => (),
366 }
367
368 all_null = false
369 }
370
371 if all_null {
372 Ok(Arc::new(NullArray::new(args[0].len())))
373 } else if large_list {
374 concat_internal::<i64>(args)
375 } else {
376 concat_internal::<i32>(args)
377 }
378}
379
380fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
381 let args = align_array_dimensions::<O>(args.to_vec())?;
382
383 let list_arrays = args
384 .iter()
385 .map(|arg| as_generic_list_array::<O>(arg))
386 .collect::<Result<Vec<_>>>()?;
387 let row_count = list_arrays[0].len();
389
390 let mut array_lengths = vec![];
391 let mut arrays = vec![];
392 let mut valid = NullBufferBuilder::new(row_count);
393 for i in 0..row_count {
394 let nulls = list_arrays
395 .iter()
396 .map(|arr| arr.is_null(i))
397 .collect::<Vec<_>>();
398
399 let is_null = nulls.iter().all(|&x| x);
401 if is_null {
402 array_lengths.push(0);
403 valid.append_null();
404 } else {
405 let values = list_arrays
407 .iter()
408 .map(|arr| arr.value(i))
409 .collect::<Vec<_>>();
410
411 let elements = values
412 .iter()
413 .map(|a| a.as_ref())
414 .collect::<Vec<&dyn Array>>();
415
416 let concatenated_array = arrow::compute::concat(elements.as_slice())?;
418 array_lengths.push(concatenated_array.len());
419 arrays.push(concatenated_array);
420 valid.append_non_null();
421 }
422 }
423 let data_type = list_arrays[0].value_type();
425
426 let elements = arrays
427 .iter()
428 .map(|a| a.as_ref())
429 .collect::<Vec<&dyn Array>>();
430
431 let list_arr = GenericListArray::<O>::new(
432 Arc::new(Field::new_list_field(data_type, true)),
433 OffsetBuffer::from_lengths(array_lengths),
434 Arc::new(arrow::compute::concat(elements.as_slice())?),
435 valid.finish(),
436 );
437
438 Ok(Arc::new(list_arr))
439}
440
441pub(crate) fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
445 let [array, values] = take_function_args("array_append", args)?;
446 match array.data_type() {
447 DataType::Null => make_array_inner(&[Arc::clone(values)]),
448 DataType::List(_) => general_append_and_prepend::<i32>(args, true),
449 DataType::LargeList(_) => general_append_and_prepend::<i64>(args, true),
450 arg_type => exec_err!("array_append does not support type {arg_type}"),
451 }
452}
453
454pub(crate) fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
456 let [values, array] = take_function_args("array_prepend", args)?;
457 match array.data_type() {
458 DataType::Null => make_array_inner(&[Arc::clone(values)]),
459 DataType::List(_) => general_append_and_prepend::<i32>(args, false),
460 DataType::LargeList(_) => general_append_and_prepend::<i64>(args, false),
461 arg_type => exec_err!("array_prepend does not support type {arg_type}"),
462 }
463}
464
465fn general_append_and_prepend<O: OffsetSizeTrait>(
466 args: &[ArrayRef],
467 is_append: bool,
468) -> Result<ArrayRef>
469where
470 i64: TryInto<O>,
471{
472 let (list_array, element_array) = if is_append {
473 let list_array = as_generic_list_array::<O>(&args[0])?;
474 let element_array = &args[1];
475 check_datatypes("array_append", &[element_array, list_array.values()])?;
476 (list_array, element_array)
477 } else {
478 let list_array = as_generic_list_array::<O>(&args[1])?;
479 let element_array = &args[0];
480 check_datatypes("array_prepend", &[list_array.values(), element_array])?;
481 (list_array, element_array)
482 };
483
484 let res = match list_array.value_type() {
485 DataType::List(_) => concat_internal::<O>(args)?,
486 DataType::LargeList(_) => concat_internal::<O>(args)?,
487 data_type => {
488 return generic_append_and_prepend::<O>(
489 list_array,
490 element_array,
491 &data_type,
492 is_append,
493 );
494 }
495 };
496
497 Ok(res)
498}
499
500fn generic_append_and_prepend<O: OffsetSizeTrait>(
520 list_array: &GenericListArray<O>,
521 element_array: &ArrayRef,
522 data_type: &DataType,
523 is_append: bool,
524) -> Result<ArrayRef>
525where
526 i64: TryInto<O>,
527{
528 let mut offsets = vec![O::usize_as(0)];
529 let values = list_array.values();
530 let original_data = values.to_data();
531 let element_data = element_array.to_data();
532 let capacity = Capacities::Array(original_data.len() + element_data.len());
533
534 let mut mutable = MutableArrayData::with_capacities(
535 vec![&original_data, &element_data],
536 false,
537 capacity,
538 );
539
540 let values_index = 0;
541 let element_index = 1;
542
543 for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
544 let start = offset_window[0].to_usize().unwrap();
545 let end = offset_window[1].to_usize().unwrap();
546 if is_append {
547 mutable.extend(values_index, start, end);
548 mutable.extend(element_index, row_index, row_index + 1);
549 } else {
550 mutable.extend(element_index, row_index, row_index + 1);
551 mutable.extend(values_index, start, end);
552 }
553 offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
554 }
555
556 let data = mutable.freeze();
557
558 Ok(Arc::new(GenericListArray::<O>::try_new(
559 Arc::new(Field::new_list_field(data_type.to_owned(), true)),
560 OffsetBuffer::new(offsets.into()),
561 arrow::array::make_array(data),
562 None,
563 )?))
564}