datafusion_functions_nested/
concat.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for `array_append`, `array_prepend` and `array_concat` functions.
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::make_array::make_array_inner;
24use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
25use arrow::array::{
26    Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
27    NullBufferBuilder, OffsetSizeTrait,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::{DataType, Field};
31use datafusion_common::utils::{
32    base_type, coerced_type_with_base_type_only, ListCoercion,
33};
34use datafusion_common::Result;
35use datafusion_common::{
36    cast::as_generic_list_array,
37    exec_err, plan_err,
38    utils::{list_ndims, take_function_args},
39};
40use datafusion_expr::binary::type_union_resolution;
41use datafusion_expr::{
42    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45use itertools::Itertools;
46
47make_udf_expr_and_func!(
48    ArrayAppend,
49    array_append,
50    array element,                                // arg name
51    "appends an element to the end of an array.", // doc
52    array_append_udf                              // internal function name
53);
54
55#[user_doc(
56    doc_section(label = "Array Functions"),
57    description = "Appends an element to the end of an array.",
58    syntax_example = "array_append(array, element)",
59    sql_example = r#"```sql
60> select array_append([1, 2, 3], 4);
61+--------------------------------------+
62| array_append(List([1,2,3]),Int64(4)) |
63+--------------------------------------+
64| [1, 2, 3, 4]                         |
65+--------------------------------------+
66```"#,
67    argument(
68        name = "array",
69        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
70    ),
71    argument(name = "element", description = "Element to append to the array.")
72)]
73#[derive(Debug, PartialEq, Eq, Hash)]
74pub struct ArrayAppend {
75    signature: Signature,
76    aliases: Vec<String>,
77}
78
79impl Default for ArrayAppend {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl ArrayAppend {
86    pub fn new() -> Self {
87        Self {
88            signature: Signature::array_and_element(Volatility::Immutable),
89            aliases: vec![
90                String::from("list_append"),
91                String::from("array_push_back"),
92                String::from("list_push_back"),
93            ],
94        }
95    }
96}
97
98impl ScalarUDFImpl for ArrayAppend {
99    fn as_any(&self) -> &dyn Any {
100        self
101    }
102
103    fn name(&self) -> &str {
104        "array_append"
105    }
106
107    fn signature(&self) -> &Signature {
108        &self.signature
109    }
110
111    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
112        let [array_type, element_type] = take_function_args(self.name(), arg_types)?;
113        if array_type.is_null() {
114            Ok(DataType::new_list(element_type.clone(), true))
115        } else {
116            Ok(array_type.clone())
117        }
118    }
119
120    fn invoke_with_args(
121        &self,
122        args: datafusion_expr::ScalarFunctionArgs,
123    ) -> Result<ColumnarValue> {
124        make_scalar_function(array_append_inner)(&args.args)
125    }
126
127    fn aliases(&self) -> &[String] {
128        &self.aliases
129    }
130
131    fn documentation(&self) -> Option<&Documentation> {
132        self.doc()
133    }
134}
135
136make_udf_expr_and_func!(
137    ArrayPrepend,
138    array_prepend,
139    element array,
140    "Prepends an element to the beginning of an array.",
141    array_prepend_udf
142);
143
144#[user_doc(
145    doc_section(label = "Array Functions"),
146    description = "Prepends an element to the beginning of an array.",
147    syntax_example = "array_prepend(element, array)",
148    sql_example = r#"```sql
149> select array_prepend(1, [2, 3, 4]);
150+---------------------------------------+
151| array_prepend(Int64(1),List([2,3,4])) |
152+---------------------------------------+
153| [1, 2, 3, 4]                          |
154+---------------------------------------+
155```"#,
156    argument(
157        name = "array",
158        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
159    ),
160    argument(name = "element", description = "Element to prepend to the array.")
161)]
162#[derive(Debug, PartialEq, Eq, Hash)]
163pub struct ArrayPrepend {
164    signature: Signature,
165    aliases: Vec<String>,
166}
167
168impl Default for ArrayPrepend {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174impl ArrayPrepend {
175    pub fn new() -> Self {
176        Self {
177            signature: Signature::element_and_array(Volatility::Immutable),
178            aliases: vec![
179                String::from("list_prepend"),
180                String::from("array_push_front"),
181                String::from("list_push_front"),
182            ],
183        }
184    }
185}
186
187impl ScalarUDFImpl for ArrayPrepend {
188    fn as_any(&self) -> &dyn Any {
189        self
190    }
191
192    fn name(&self) -> &str {
193        "array_prepend"
194    }
195
196    fn signature(&self) -> &Signature {
197        &self.signature
198    }
199
200    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
201        let [element_type, array_type] = take_function_args(self.name(), arg_types)?;
202        if array_type.is_null() {
203            Ok(DataType::new_list(element_type.clone(), true))
204        } else {
205            Ok(array_type.clone())
206        }
207    }
208
209    fn invoke_with_args(
210        &self,
211        args: datafusion_expr::ScalarFunctionArgs,
212    ) -> Result<ColumnarValue> {
213        make_scalar_function(array_prepend_inner)(&args.args)
214    }
215
216    fn aliases(&self) -> &[String] {
217        &self.aliases
218    }
219
220    fn documentation(&self) -> Option<&Documentation> {
221        self.doc()
222    }
223}
224
225make_udf_expr_and_func!(
226    ArrayConcat,
227    array_concat,
228    "Concatenates arrays.",
229    array_concat_udf
230);
231
232#[user_doc(
233    doc_section(label = "Array Functions"),
234    description = "Concatenates arrays.",
235    syntax_example = "array_concat(array[, ..., array_n])",
236    sql_example = r#"```sql
237> select array_concat([1, 2], [3, 4], [5, 6]);
238+---------------------------------------------------+
239| array_concat(List([1,2]),List([3,4]),List([5,6])) |
240+---------------------------------------------------+
241| [1, 2, 3, 4, 5, 6]                                |
242+---------------------------------------------------+
243```"#,
244    argument(
245        name = "array",
246        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
247    ),
248    argument(
249        name = "array_n",
250        description = "Subsequent array column or literal array to concatenate."
251    )
252)]
253#[derive(Debug, PartialEq, Eq, Hash)]
254pub struct ArrayConcat {
255    signature: Signature,
256    aliases: Vec<String>,
257}
258
259impl Default for ArrayConcat {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265impl ArrayConcat {
266    pub fn new() -> Self {
267        Self {
268            signature: Signature::user_defined(Volatility::Immutable),
269            aliases: vec![
270                String::from("array_cat"),
271                String::from("list_concat"),
272                String::from("list_cat"),
273            ],
274        }
275    }
276}
277
278impl ScalarUDFImpl for ArrayConcat {
279    fn as_any(&self) -> &dyn Any {
280        self
281    }
282
283    fn name(&self) -> &str {
284        "array_concat"
285    }
286
287    fn signature(&self) -> &Signature {
288        &self.signature
289    }
290
291    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
292        let mut max_dims = 0;
293        let mut large_list = false;
294        let mut element_types = Vec::with_capacity(arg_types.len());
295        for arg_type in arg_types {
296            match arg_type {
297                DataType::Null | DataType::List(_) | DataType::FixedSizeList(..) => (),
298                DataType::LargeList(_) => large_list = true,
299                arg_type => {
300                    return plan_err!("{} does not support type {arg_type}", self.name())
301                }
302            }
303
304            max_dims = max_dims.max(list_ndims(arg_type));
305            element_types.push(base_type(arg_type))
306        }
307
308        if max_dims == 0 {
309            Ok(DataType::Null)
310        } else if let Some(mut return_type) = type_union_resolution(&element_types) {
311            for _ in 1..max_dims {
312                return_type = DataType::new_list(return_type, true)
313            }
314
315            if large_list {
316                Ok(DataType::new_large_list(return_type, true))
317            } else {
318                Ok(DataType::new_list(return_type, true))
319            }
320        } else {
321            plan_err!(
322                "Failed to unify argument types of {}: [{}]",
323                self.name(),
324                arg_types.iter().join(", ")
325            )
326        }
327    }
328
329    fn invoke_with_args(
330        &self,
331        args: datafusion_expr::ScalarFunctionArgs,
332    ) -> Result<ColumnarValue> {
333        make_scalar_function(array_concat_inner)(&args.args)
334    }
335
336    fn aliases(&self) -> &[String] {
337        &self.aliases
338    }
339
340    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
341        let base_type = base_type(&self.return_type(arg_types)?);
342        let coercion = Some(&ListCoercion::FixedSizedListToList);
343        let arg_types = arg_types.iter().map(|arg_type| {
344            coerced_type_with_base_type_only(arg_type, &base_type, coercion)
345        });
346
347        Ok(arg_types.collect())
348    }
349
350    fn documentation(&self) -> Option<&Documentation> {
351        self.doc()
352    }
353}
354
355/// Array_concat/Array_cat SQL function
356pub(crate) fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
357    if args.is_empty() {
358        return exec_err!("array_concat expects at least one argument");
359    }
360
361    let mut all_null = true;
362    let mut large_list = false;
363    for arg in args {
364        match arg.data_type() {
365            DataType::Null => continue,
366            DataType::LargeList(_) => large_list = true,
367            _ => (),
368        }
369        if arg.null_count() < arg.len() {
370            all_null = false;
371        }
372    }
373
374    if all_null {
375        // Return a null array with the same type as the first non-null-type argument
376        let return_type = args
377            .iter()
378            .map(|arg| arg.data_type())
379            .find_or_first(|d| !d.is_null())
380            .unwrap(); // Safe because args is non-empty
381
382        Ok(arrow::array::make_array(ArrayData::new_null(
383            return_type,
384            args[0].len(),
385        )))
386    } else if large_list {
387        concat_internal::<i64>(args)
388    } else {
389        concat_internal::<i32>(args)
390    }
391}
392
393fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
394    let args = align_array_dimensions::<O>(args.to_vec())?;
395
396    let list_arrays = args
397        .iter()
398        .map(|arg| as_generic_list_array::<O>(arg))
399        .collect::<Result<Vec<_>>>()?;
400    // Assume number of rows is the same for all arrays
401    let row_count = list_arrays[0].len();
402
403    let mut array_lengths = vec![];
404    let mut arrays = vec![];
405    let mut valid = NullBufferBuilder::new(row_count);
406    for i in 0..row_count {
407        let nulls = list_arrays
408            .iter()
409            .map(|arr| arr.is_null(i))
410            .collect::<Vec<_>>();
411
412        // If all the arrays are null, the concatenated array is null
413        let is_null = nulls.iter().all(|&x| x);
414        if is_null {
415            array_lengths.push(0);
416            valid.append_null();
417        } else {
418            // Get all the arrays on i-th row
419            let values = list_arrays
420                .iter()
421                .map(|arr| arr.value(i))
422                .collect::<Vec<_>>();
423
424            let elements = values
425                .iter()
426                .map(|a| a.as_ref())
427                .collect::<Vec<&dyn Array>>();
428
429            // Concatenated array on i-th row
430            let concatenated_array = arrow::compute::concat(elements.as_slice())?;
431            array_lengths.push(concatenated_array.len());
432            arrays.push(concatenated_array);
433            valid.append_non_null();
434        }
435    }
436    // Assume all arrays have the same data type
437    let data_type = list_arrays[0].value_type();
438
439    let elements = arrays
440        .iter()
441        .map(|a| a.as_ref())
442        .collect::<Vec<&dyn Array>>();
443
444    let list_arr = GenericListArray::<O>::new(
445        Arc::new(Field::new_list_field(data_type, true)),
446        OffsetBuffer::from_lengths(array_lengths),
447        Arc::new(arrow::compute::concat(elements.as_slice())?),
448        valid.finish(),
449    );
450
451    Ok(Arc::new(list_arr))
452}
453
454// Kernel functions
455
456/// Array_append SQL function
457pub(crate) fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
458    let [array, values] = take_function_args("array_append", args)?;
459    match array.data_type() {
460        DataType::Null => make_array_inner(&[Arc::clone(values)]),
461        DataType::List(_) => general_append_and_prepend::<i32>(args, true),
462        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, true),
463        arg_type => exec_err!("array_append does not support type {arg_type}"),
464    }
465}
466
467/// Array_prepend SQL function
468pub(crate) fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
469    let [values, array] = take_function_args("array_prepend", args)?;
470    match array.data_type() {
471        DataType::Null => make_array_inner(&[Arc::clone(values)]),
472        DataType::List(_) => general_append_and_prepend::<i32>(args, false),
473        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, false),
474        arg_type => exec_err!("array_prepend does not support type {arg_type}"),
475    }
476}
477
478fn general_append_and_prepend<O: OffsetSizeTrait>(
479    args: &[ArrayRef],
480    is_append: bool,
481) -> Result<ArrayRef>
482where
483    i64: TryInto<O>,
484{
485    let (list_array, element_array) = if is_append {
486        let list_array = as_generic_list_array::<O>(&args[0])?;
487        let element_array = &args[1];
488        check_datatypes("array_append", &[element_array, list_array.values()])?;
489        (list_array, element_array)
490    } else {
491        let list_array = as_generic_list_array::<O>(&args[1])?;
492        let element_array = &args[0];
493        check_datatypes("array_prepend", &[list_array.values(), element_array])?;
494        (list_array, element_array)
495    };
496
497    let res = match list_array.value_type() {
498        DataType::List(_) => concat_internal::<O>(args)?,
499        DataType::LargeList(_) => concat_internal::<O>(args)?,
500        data_type => {
501            return generic_append_and_prepend::<O>(
502                list_array,
503                element_array,
504                &data_type,
505                is_append,
506            );
507        }
508    };
509
510    Ok(res)
511}
512
513/// Appends or prepends elements to a ListArray.
514///
515/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean flag
516/// indicating whether to append or prepend the elements. It returns a `Result<ArrayRef>`
517/// representing the resulting ListArray after the operation.
518///
519/// # Arguments
520///
521/// * `list_array` - A reference to the ListArray to which elements will be appended/prepended.
522/// * `element_array` - A reference to the Array containing elements to be appended/prepended.
523/// * `field` - A reference to the Field describing the data type of the arrays.
524/// * `is_append` - A boolean flag indicating whether to append (`true`) or prepend (`false`) elements.
525///
526/// # Examples
527///
528/// generic_append_and_prepend(
529///     [1, 2, 3], 4, append => [1, 2, 3, 4]
530///     5, [6, 7, 8], prepend => [5, 6, 7, 8]
531/// )
532fn generic_append_and_prepend<O: OffsetSizeTrait>(
533    list_array: &GenericListArray<O>,
534    element_array: &ArrayRef,
535    data_type: &DataType,
536    is_append: bool,
537) -> Result<ArrayRef>
538where
539    i64: TryInto<O>,
540{
541    let mut offsets = vec![O::usize_as(0)];
542    let values = list_array.values();
543    let original_data = values.to_data();
544    let element_data = element_array.to_data();
545    let capacity = Capacities::Array(original_data.len() + element_data.len());
546
547    let mut mutable = MutableArrayData::with_capacities(
548        vec![&original_data, &element_data],
549        false,
550        capacity,
551    );
552
553    let values_index = 0;
554    let element_index = 1;
555
556    for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
557        let start = offset_window[0].to_usize().unwrap();
558        let end = offset_window[1].to_usize().unwrap();
559        if is_append {
560            mutable.extend(values_index, start, end);
561            mutable.extend(element_index, row_index, row_index + 1);
562        } else {
563            mutable.extend(element_index, row_index, row_index + 1);
564            mutable.extend(values_index, start, end);
565        }
566        offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
567    }
568
569    let data = mutable.freeze();
570
571    Ok(Arc::new(GenericListArray::<O>::try_new(
572        Arc::new(Field::new_list_field(data_type.to_owned(), true)),
573        OffsetBuffer::new(offsets.into()),
574        arrow::array::make_array(data),
575        None,
576    )?))
577}