datafusion_functions_nested/
concat.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for `array_append`, `array_prepend` and `array_concat` functions.
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::make_array::make_array_inner;
24use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
25use arrow::array::{
26    Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
27    NullBufferBuilder, OffsetSizeTrait,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::{DataType, Field};
31use datafusion_common::Result;
32use datafusion_common::utils::{
33    ListCoercion, base_type, coerced_type_with_base_type_only,
34};
35use datafusion_common::{
36    cast::as_generic_list_array,
37    exec_err, plan_err,
38    utils::{list_ndims, take_function_args},
39};
40use datafusion_expr::binary::type_union_resolution;
41use datafusion_expr::{
42    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45use itertools::Itertools;
46
47make_udf_expr_and_func!(
48    ArrayAppend,
49    array_append,
50    array element,                                // arg name
51    "appends an element to the end of an array.", // doc
52    array_append_udf                              // internal function name
53);
54
55#[user_doc(
56    doc_section(label = "Array Functions"),
57    description = "Appends an element to the end of an array.",
58    syntax_example = "array_append(array, element)",
59    sql_example = r#"```sql
60> select array_append([1, 2, 3], 4);
61+--------------------------------------+
62| array_append(List([1,2,3]),Int64(4)) |
63+--------------------------------------+
64| [1, 2, 3, 4]                         |
65+--------------------------------------+
66```"#,
67    argument(
68        name = "array",
69        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
70    ),
71    argument(name = "element", description = "Element to append to the array.")
72)]
73#[derive(Debug, PartialEq, Eq, Hash)]
74pub struct ArrayAppend {
75    signature: Signature,
76    aliases: Vec<String>,
77}
78
79impl Default for ArrayAppend {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl ArrayAppend {
86    pub fn new() -> Self {
87        Self {
88            signature: Signature::array_and_element(Volatility::Immutable),
89            aliases: vec![
90                String::from("list_append"),
91                String::from("array_push_back"),
92                String::from("list_push_back"),
93            ],
94        }
95    }
96}
97
98impl ScalarUDFImpl for ArrayAppend {
99    fn as_any(&self) -> &dyn Any {
100        self
101    }
102
103    fn name(&self) -> &str {
104        "array_append"
105    }
106
107    fn signature(&self) -> &Signature {
108        &self.signature
109    }
110
111    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
112        let [array_type, element_type] = take_function_args(self.name(), arg_types)?;
113        if array_type.is_null() {
114            Ok(DataType::new_list(element_type.clone(), true))
115        } else {
116            Ok(array_type.clone())
117        }
118    }
119
120    fn invoke_with_args(
121        &self,
122        args: datafusion_expr::ScalarFunctionArgs,
123    ) -> Result<ColumnarValue> {
124        make_scalar_function(array_append_inner)(&args.args)
125    }
126
127    fn aliases(&self) -> &[String] {
128        &self.aliases
129    }
130
131    fn documentation(&self) -> Option<&Documentation> {
132        self.doc()
133    }
134}
135
136make_udf_expr_and_func!(
137    ArrayPrepend,
138    array_prepend,
139    element array,
140    "Prepends an element to the beginning of an array.",
141    array_prepend_udf
142);
143
144#[user_doc(
145    doc_section(label = "Array Functions"),
146    description = "Prepends an element to the beginning of an array.",
147    syntax_example = "array_prepend(element, array)",
148    sql_example = r#"```sql
149> select array_prepend(1, [2, 3, 4]);
150+---------------------------------------+
151| array_prepend(Int64(1),List([2,3,4])) |
152+---------------------------------------+
153| [1, 2, 3, 4]                          |
154+---------------------------------------+
155```"#,
156    argument(
157        name = "array",
158        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
159    ),
160    argument(name = "element", description = "Element to prepend to the array.")
161)]
162#[derive(Debug, PartialEq, Eq, Hash)]
163pub struct ArrayPrepend {
164    signature: Signature,
165    aliases: Vec<String>,
166}
167
168impl Default for ArrayPrepend {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174impl ArrayPrepend {
175    pub fn new() -> Self {
176        Self {
177            signature: Signature::element_and_array(Volatility::Immutable),
178            aliases: vec![
179                String::from("list_prepend"),
180                String::from("array_push_front"),
181                String::from("list_push_front"),
182            ],
183        }
184    }
185}
186
187impl ScalarUDFImpl for ArrayPrepend {
188    fn as_any(&self) -> &dyn Any {
189        self
190    }
191
192    fn name(&self) -> &str {
193        "array_prepend"
194    }
195
196    fn signature(&self) -> &Signature {
197        &self.signature
198    }
199
200    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
201        let [element_type, array_type] = take_function_args(self.name(), arg_types)?;
202        if array_type.is_null() {
203            Ok(DataType::new_list(element_type.clone(), true))
204        } else {
205            Ok(array_type.clone())
206        }
207    }
208
209    fn invoke_with_args(
210        &self,
211        args: datafusion_expr::ScalarFunctionArgs,
212    ) -> Result<ColumnarValue> {
213        make_scalar_function(array_prepend_inner)(&args.args)
214    }
215
216    fn aliases(&self) -> &[String] {
217        &self.aliases
218    }
219
220    fn documentation(&self) -> Option<&Documentation> {
221        self.doc()
222    }
223}
224
225make_udf_expr_and_func!(
226    ArrayConcat,
227    array_concat,
228    "Concatenates arrays.",
229    array_concat_udf
230);
231
232#[user_doc(
233    doc_section(label = "Array Functions"),
234    description = "Concatenates arrays.",
235    syntax_example = "array_concat(array[, ..., array_n])",
236    sql_example = r#"```sql
237> select array_concat([1, 2], [3, 4], [5, 6]);
238+---------------------------------------------------+
239| array_concat(List([1,2]),List([3,4]),List([5,6])) |
240+---------------------------------------------------+
241| [1, 2, 3, 4, 5, 6]                                |
242+---------------------------------------------------+
243```"#,
244    argument(
245        name = "array",
246        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
247    ),
248    argument(
249        name = "array_n",
250        description = "Subsequent array column or literal array to concatenate."
251    )
252)]
253#[derive(Debug, PartialEq, Eq, Hash)]
254pub struct ArrayConcat {
255    signature: Signature,
256    aliases: Vec<String>,
257}
258
259impl Default for ArrayConcat {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265impl ArrayConcat {
266    pub fn new() -> Self {
267        Self {
268            signature: Signature::user_defined(Volatility::Immutable),
269            aliases: vec![
270                String::from("array_cat"),
271                String::from("list_concat"),
272                String::from("list_cat"),
273            ],
274        }
275    }
276}
277
278impl ScalarUDFImpl for ArrayConcat {
279    fn as_any(&self) -> &dyn Any {
280        self
281    }
282
283    fn name(&self) -> &str {
284        "array_concat"
285    }
286
287    fn signature(&self) -> &Signature {
288        &self.signature
289    }
290
291    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
292        let mut max_dims = 0;
293        let mut large_list = false;
294        let mut element_types = Vec::with_capacity(arg_types.len());
295        for arg_type in arg_types {
296            match arg_type {
297                DataType::Null | DataType::List(_) | DataType::FixedSizeList(..) => (),
298                DataType::LargeList(_) => large_list = true,
299                arg_type => {
300                    return plan_err!("{} does not support type {arg_type}", self.name());
301                }
302            }
303
304            max_dims = max_dims.max(list_ndims(arg_type));
305            element_types.push(base_type(arg_type))
306        }
307
308        if max_dims == 0 {
309            Ok(DataType::Null)
310        } else if let Some(mut return_type) = type_union_resolution(&element_types) {
311            for _ in 1..max_dims {
312                return_type = DataType::new_list(return_type, true)
313            }
314
315            if large_list {
316                Ok(DataType::new_large_list(return_type, true))
317            } else {
318                Ok(DataType::new_list(return_type, true))
319            }
320        } else {
321            plan_err!(
322                "Failed to unify argument types of {}: [{}]",
323                self.name(),
324                arg_types.iter().join(", ")
325            )
326        }
327    }
328
329    fn invoke_with_args(
330        &self,
331        args: datafusion_expr::ScalarFunctionArgs,
332    ) -> Result<ColumnarValue> {
333        make_scalar_function(array_concat_inner)(&args.args)
334    }
335
336    fn aliases(&self) -> &[String] {
337        &self.aliases
338    }
339
340    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
341        let base_type = base_type(&self.return_type(arg_types)?);
342        let coercion = Some(&ListCoercion::FixedSizedListToList);
343        let arg_types = arg_types.iter().map(|arg_type| {
344            coerced_type_with_base_type_only(arg_type, &base_type, coercion)
345        });
346
347        Ok(arg_types.collect())
348    }
349
350    fn documentation(&self) -> Option<&Documentation> {
351        self.doc()
352    }
353}
354
355fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
356    if args.is_empty() {
357        return exec_err!("array_concat expects at least one argument");
358    }
359
360    let mut all_null = true;
361    let mut large_list = false;
362    for arg in args {
363        match arg.data_type() {
364            DataType::Null => continue,
365            DataType::LargeList(_) => large_list = true,
366            _ => (),
367        }
368        if arg.null_count() < arg.len() {
369            all_null = false;
370        }
371    }
372
373    if all_null {
374        // Return a null array with the same type as the first non-null-type argument
375        let return_type = args
376            .iter()
377            .map(|arg| arg.data_type())
378            .find_or_first(|d| !d.is_null())
379            .unwrap(); // Safe because args is non-empty
380
381        Ok(arrow::array::make_array(ArrayData::new_null(
382            return_type,
383            args[0].len(),
384        )))
385    } else if large_list {
386        concat_internal::<i64>(args)
387    } else {
388        concat_internal::<i32>(args)
389    }
390}
391
392fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
393    let args = align_array_dimensions::<O>(args.to_vec())?;
394
395    let list_arrays = args
396        .iter()
397        .map(|arg| as_generic_list_array::<O>(arg))
398        .collect::<Result<Vec<_>>>()?;
399    // Assume number of rows is the same for all arrays
400    let row_count = list_arrays[0].len();
401
402    let mut array_lengths = vec![];
403    let mut arrays = vec![];
404    let mut valid = NullBufferBuilder::new(row_count);
405    for i in 0..row_count {
406        let nulls = list_arrays
407            .iter()
408            .map(|arr| arr.is_null(i))
409            .collect::<Vec<_>>();
410
411        // If all the arrays are null, the concatenated array is null
412        let is_null = nulls.iter().all(|&x| x);
413        if is_null {
414            array_lengths.push(0);
415            valid.append_null();
416        } else {
417            // Get all the arrays on i-th row
418            let values = list_arrays
419                .iter()
420                .map(|arr| arr.value(i))
421                .collect::<Vec<_>>();
422
423            let elements = values
424                .iter()
425                .map(|a| a.as_ref())
426                .collect::<Vec<&dyn Array>>();
427
428            // Concatenated array on i-th row
429            let concatenated_array = arrow::compute::concat(elements.as_slice())?;
430            array_lengths.push(concatenated_array.len());
431            arrays.push(concatenated_array);
432            valid.append_non_null();
433        }
434    }
435    // Assume all arrays have the same data type
436    let data_type = list_arrays[0].value_type();
437
438    let elements = arrays
439        .iter()
440        .map(|a| a.as_ref())
441        .collect::<Vec<&dyn Array>>();
442
443    let list_arr = GenericListArray::<O>::new(
444        Arc::new(Field::new_list_field(data_type, true)),
445        OffsetBuffer::from_lengths(array_lengths),
446        Arc::new(arrow::compute::concat(elements.as_slice())?),
447        valid.finish(),
448    );
449
450    Ok(Arc::new(list_arr))
451}
452
453// Kernel functions
454
455fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
456    let [array, values] = take_function_args("array_append", args)?;
457    match array.data_type() {
458        DataType::Null => make_array_inner(&[Arc::clone(values)]),
459        DataType::List(_) => general_append_and_prepend::<i32>(args, true),
460        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, true),
461        arg_type => exec_err!("array_append does not support type {arg_type}"),
462    }
463}
464
465fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
466    let [values, array] = take_function_args("array_prepend", args)?;
467    match array.data_type() {
468        DataType::Null => make_array_inner(&[Arc::clone(values)]),
469        DataType::List(_) => general_append_and_prepend::<i32>(args, false),
470        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, false),
471        arg_type => exec_err!("array_prepend does not support type {arg_type}"),
472    }
473}
474
475fn general_append_and_prepend<O: OffsetSizeTrait>(
476    args: &[ArrayRef],
477    is_append: bool,
478) -> Result<ArrayRef>
479where
480    i64: TryInto<O>,
481{
482    let (list_array, element_array) = if is_append {
483        let list_array = as_generic_list_array::<O>(&args[0])?;
484        let element_array = &args[1];
485        check_datatypes("array_append", &[element_array, list_array.values()])?;
486        (list_array, element_array)
487    } else {
488        let list_array = as_generic_list_array::<O>(&args[1])?;
489        let element_array = &args[0];
490        check_datatypes("array_prepend", &[list_array.values(), element_array])?;
491        (list_array, element_array)
492    };
493
494    let res = match list_array.value_type() {
495        DataType::List(_) => concat_internal::<O>(args)?,
496        DataType::LargeList(_) => concat_internal::<O>(args)?,
497        data_type => {
498            return generic_append_and_prepend::<O>(
499                list_array,
500                element_array,
501                &data_type,
502                is_append,
503            );
504        }
505    };
506
507    Ok(res)
508}
509
510/// Appends or prepends elements to a ListArray.
511///
512/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean flag
513/// indicating whether to append or prepend the elements. It returns a `Result<ArrayRef>`
514/// representing the resulting ListArray after the operation.
515///
516/// # Arguments
517///
518/// * `list_array` - A reference to the ListArray to which elements will be appended/prepended.
519/// * `element_array` - A reference to the Array containing elements to be appended/prepended.
520/// * `field` - A reference to the Field describing the data type of the arrays.
521/// * `is_append` - A boolean flag indicating whether to append (`true`) or prepend (`false`) elements.
522///
523/// # Examples
524///
525/// generic_append_and_prepend(
526///     [1, 2, 3], 4, append => [1, 2, 3, 4]
527///     5, [6, 7, 8], prepend => [5, 6, 7, 8]
528/// )
529fn generic_append_and_prepend<O: OffsetSizeTrait>(
530    list_array: &GenericListArray<O>,
531    element_array: &ArrayRef,
532    data_type: &DataType,
533    is_append: bool,
534) -> Result<ArrayRef>
535where
536    i64: TryInto<O>,
537{
538    let mut offsets = vec![O::usize_as(0)];
539    let values = list_array.values();
540    let original_data = values.to_data();
541    let element_data = element_array.to_data();
542    let capacity = Capacities::Array(original_data.len() + element_data.len());
543
544    let mut mutable = MutableArrayData::with_capacities(
545        vec![&original_data, &element_data],
546        false,
547        capacity,
548    );
549
550    let values_index = 0;
551    let element_index = 1;
552
553    for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
554        let start = offset_window[0].to_usize().unwrap();
555        let end = offset_window[1].to_usize().unwrap();
556        if is_append {
557            mutable.extend(values_index, start, end);
558            mutable.extend(element_index, row_index, row_index + 1);
559        } else {
560            mutable.extend(element_index, row_index, row_index + 1);
561            mutable.extend(values_index, start, end);
562        }
563        offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
564    }
565
566    let data = mutable.freeze();
567
568    Ok(Arc::new(GenericListArray::<O>::try_new(
569        Arc::new(Field::new_list_field(data_type.to_owned(), true)),
570        OffsetBuffer::new(offsets.into()),
571        arrow::array::make_array(data),
572        None,
573    )?))
574}