datafusion_functions_nested/
concat.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for `array_append`, `array_prepend` and `array_concat` functions.
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::make_array::make_array_inner;
24use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
25use arrow::array::{
26    Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
27    NullBufferBuilder, OffsetSizeTrait,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::{DataType, Field};
31use datafusion_common::utils::{
32    base_type, coerced_type_with_base_type_only, ListCoercion,
33};
34use datafusion_common::Result;
35use datafusion_common::{
36    cast::as_generic_list_array,
37    exec_err, plan_err,
38    utils::{list_ndims, take_function_args},
39};
40use datafusion_expr::binary::type_union_resolution;
41use datafusion_expr::{
42    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45use itertools::Itertools;
46
47make_udf_expr_and_func!(
48    ArrayAppend,
49    array_append,
50    array element,                                // arg name
51    "appends an element to the end of an array.", // doc
52    array_append_udf                              // internal function name
53);
54
55#[user_doc(
56    doc_section(label = "Array Functions"),
57    description = "Appends an element to the end of an array.",
58    syntax_example = "array_append(array, element)",
59    sql_example = r#"```sql
60> select array_append([1, 2, 3], 4);
61+--------------------------------------+
62| array_append(List([1,2,3]),Int64(4)) |
63+--------------------------------------+
64| [1, 2, 3, 4]                         |
65+--------------------------------------+
66```"#,
67    argument(
68        name = "array",
69        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
70    ),
71    argument(name = "element", description = "Element to append to the array.")
72)]
73#[derive(Debug)]
74pub struct ArrayAppend {
75    signature: Signature,
76    aliases: Vec<String>,
77}
78
79impl Default for ArrayAppend {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl ArrayAppend {
86    pub fn new() -> Self {
87        Self {
88            signature: Signature::array_and_element(Volatility::Immutable),
89            aliases: vec![
90                String::from("list_append"),
91                String::from("array_push_back"),
92                String::from("list_push_back"),
93            ],
94        }
95    }
96}
97
98impl ScalarUDFImpl for ArrayAppend {
99    fn as_any(&self) -> &dyn Any {
100        self
101    }
102
103    fn name(&self) -> &str {
104        "array_append"
105    }
106
107    fn signature(&self) -> &Signature {
108        &self.signature
109    }
110
111    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
112        let [array_type, element_type] = take_function_args(self.name(), arg_types)?;
113        if array_type.is_null() {
114            Ok(DataType::new_list(element_type.clone(), true))
115        } else {
116            Ok(array_type.clone())
117        }
118    }
119
120    fn invoke_with_args(
121        &self,
122        args: datafusion_expr::ScalarFunctionArgs,
123    ) -> Result<ColumnarValue> {
124        make_scalar_function(array_append_inner)(&args.args)
125    }
126
127    fn aliases(&self) -> &[String] {
128        &self.aliases
129    }
130
131    fn documentation(&self) -> Option<&Documentation> {
132        self.doc()
133    }
134}
135
136make_udf_expr_and_func!(
137    ArrayPrepend,
138    array_prepend,
139    element array,
140    "Prepends an element to the beginning of an array.",
141    array_prepend_udf
142);
143
144#[user_doc(
145    doc_section(label = "Array Functions"),
146    description = "Prepends an element to the beginning of an array.",
147    syntax_example = "array_prepend(element, array)",
148    sql_example = r#"```sql
149> select array_prepend(1, [2, 3, 4]);
150+---------------------------------------+
151| array_prepend(Int64(1),List([2,3,4])) |
152+---------------------------------------+
153| [1, 2, 3, 4]                          |
154+---------------------------------------+
155```"#,
156    argument(
157        name = "array",
158        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
159    ),
160    argument(name = "element", description = "Element to prepend to the array.")
161)]
162#[derive(Debug)]
163pub struct ArrayPrepend {
164    signature: Signature,
165    aliases: Vec<String>,
166}
167
168impl Default for ArrayPrepend {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174impl ArrayPrepend {
175    pub fn new() -> Self {
176        Self {
177            signature: Signature::element_and_array(Volatility::Immutable),
178            aliases: vec![
179                String::from("list_prepend"),
180                String::from("array_push_front"),
181                String::from("list_push_front"),
182            ],
183        }
184    }
185}
186
187impl ScalarUDFImpl for ArrayPrepend {
188    fn as_any(&self) -> &dyn Any {
189        self
190    }
191
192    fn name(&self) -> &str {
193        "array_prepend"
194    }
195
196    fn signature(&self) -> &Signature {
197        &self.signature
198    }
199
200    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
201        let [element_type, array_type] = take_function_args(self.name(), arg_types)?;
202        if array_type.is_null() {
203            Ok(DataType::new_list(element_type.clone(), true))
204        } else {
205            Ok(array_type.clone())
206        }
207    }
208
209    fn invoke_with_args(
210        &self,
211        args: datafusion_expr::ScalarFunctionArgs,
212    ) -> Result<ColumnarValue> {
213        make_scalar_function(array_prepend_inner)(&args.args)
214    }
215
216    fn aliases(&self) -> &[String] {
217        &self.aliases
218    }
219
220    fn documentation(&self) -> Option<&Documentation> {
221        self.doc()
222    }
223}
224
225make_udf_expr_and_func!(
226    ArrayConcat,
227    array_concat,
228    "Concatenates arrays.",
229    array_concat_udf
230);
231
232#[user_doc(
233    doc_section(label = "Array Functions"),
234    description = "Concatenates arrays.",
235    syntax_example = "array_concat(array[, ..., array_n])",
236    sql_example = r#"```sql
237> select array_concat([1, 2], [3, 4], [5, 6]);
238+---------------------------------------------------+
239| array_concat(List([1,2]),List([3,4]),List([5,6])) |
240+---------------------------------------------------+
241| [1, 2, 3, 4, 5, 6]                                |
242+---------------------------------------------------+
243```"#,
244    argument(
245        name = "array",
246        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
247    ),
248    argument(
249        name = "array_n",
250        description = "Subsequent array column or literal array to concatenate."
251    )
252)]
253#[derive(Debug)]
254pub struct ArrayConcat {
255    signature: Signature,
256    aliases: Vec<String>,
257}
258
259impl Default for ArrayConcat {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265impl ArrayConcat {
266    pub fn new() -> Self {
267        Self {
268            signature: Signature::user_defined(Volatility::Immutable),
269            aliases: vec![
270                String::from("array_cat"),
271                String::from("list_concat"),
272                String::from("list_cat"),
273            ],
274        }
275    }
276}
277
278impl ScalarUDFImpl for ArrayConcat {
279    fn as_any(&self) -> &dyn Any {
280        self
281    }
282
283    fn name(&self) -> &str {
284        "array_concat"
285    }
286
287    fn signature(&self) -> &Signature {
288        &self.signature
289    }
290
291    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
292        let mut max_dims = 0;
293        let mut large_list = false;
294        let mut element_types = Vec::with_capacity(arg_types.len());
295        for arg_type in arg_types {
296            match arg_type {
297                DataType::Null | DataType::List(_) | DataType::FixedSizeList(..) => (),
298                DataType::LargeList(_) => large_list = true,
299                arg_type => {
300                    return plan_err!("{} does not support type {arg_type}", self.name())
301                }
302            }
303
304            max_dims = max_dims.max(list_ndims(arg_type));
305            element_types.push(base_type(arg_type))
306        }
307
308        if max_dims == 0 {
309            Ok(DataType::Null)
310        } else if let Some(mut return_type) = type_union_resolution(&element_types) {
311            for _ in 1..max_dims {
312                return_type = DataType::new_list(return_type, true)
313            }
314
315            if large_list {
316                Ok(DataType::new_large_list(return_type, true))
317            } else {
318                Ok(DataType::new_list(return_type, true))
319            }
320        } else {
321            plan_err!(
322                "Failed to unify argument types of {}: {arg_types:?}",
323                self.name()
324            )
325        }
326    }
327
328    fn invoke_with_args(
329        &self,
330        args: datafusion_expr::ScalarFunctionArgs,
331    ) -> Result<ColumnarValue> {
332        make_scalar_function(array_concat_inner)(&args.args)
333    }
334
335    fn aliases(&self) -> &[String] {
336        &self.aliases
337    }
338
339    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
340        let base_type = base_type(&self.return_type(arg_types)?);
341        let coercion = Some(&ListCoercion::FixedSizedListToList);
342        let arg_types = arg_types.iter().map(|arg_type| {
343            coerced_type_with_base_type_only(arg_type, &base_type, coercion)
344        });
345
346        Ok(arg_types.collect())
347    }
348
349    fn documentation(&self) -> Option<&Documentation> {
350        self.doc()
351    }
352}
353
354/// Array_concat/Array_cat SQL function
355pub(crate) fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
356    if args.is_empty() {
357        return exec_err!("array_concat expects at least one argument");
358    }
359
360    let mut all_null = true;
361    let mut large_list = false;
362    for arg in args {
363        match arg.data_type() {
364            DataType::Null => continue,
365            DataType::LargeList(_) => large_list = true,
366            _ => (),
367        }
368        if arg.null_count() < arg.len() {
369            all_null = false;
370        }
371    }
372
373    if all_null {
374        // Return a null array with the same type as the first non-null-type argument
375        let return_type = args
376            .iter()
377            .map(|arg| arg.data_type())
378            .find_or_first(|d| !d.is_null())
379            .unwrap(); // Safe because args is non-empty
380
381        Ok(arrow::array::make_array(ArrayData::new_null(
382            return_type,
383            args[0].len(),
384        )))
385    } else if large_list {
386        concat_internal::<i64>(args)
387    } else {
388        concat_internal::<i32>(args)
389    }
390}
391
392fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
393    let args = align_array_dimensions::<O>(args.to_vec())?;
394
395    let list_arrays = args
396        .iter()
397        .map(|arg| as_generic_list_array::<O>(arg))
398        .collect::<Result<Vec<_>>>()?;
399    // Assume number of rows is the same for all arrays
400    let row_count = list_arrays[0].len();
401
402    let mut array_lengths = vec![];
403    let mut arrays = vec![];
404    let mut valid = NullBufferBuilder::new(row_count);
405    for i in 0..row_count {
406        let nulls = list_arrays
407            .iter()
408            .map(|arr| arr.is_null(i))
409            .collect::<Vec<_>>();
410
411        // If all the arrays are null, the concatenated array is null
412        let is_null = nulls.iter().all(|&x| x);
413        if is_null {
414            array_lengths.push(0);
415            valid.append_null();
416        } else {
417            // Get all the arrays on i-th row
418            let values = list_arrays
419                .iter()
420                .map(|arr| arr.value(i))
421                .collect::<Vec<_>>();
422
423            let elements = values
424                .iter()
425                .map(|a| a.as_ref())
426                .collect::<Vec<&dyn Array>>();
427
428            // Concatenated array on i-th row
429            let concatenated_array = arrow::compute::concat(elements.as_slice())?;
430            array_lengths.push(concatenated_array.len());
431            arrays.push(concatenated_array);
432            valid.append_non_null();
433        }
434    }
435    // Assume all arrays have the same data type
436    let data_type = list_arrays[0].value_type();
437
438    let elements = arrays
439        .iter()
440        .map(|a| a.as_ref())
441        .collect::<Vec<&dyn Array>>();
442
443    let list_arr = GenericListArray::<O>::new(
444        Arc::new(Field::new_list_field(data_type, true)),
445        OffsetBuffer::from_lengths(array_lengths),
446        Arc::new(arrow::compute::concat(elements.as_slice())?),
447        valid.finish(),
448    );
449
450    Ok(Arc::new(list_arr))
451}
452
453// Kernel functions
454
455/// Array_append SQL function
456pub(crate) fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
457    let [array, values] = take_function_args("array_append", args)?;
458    match array.data_type() {
459        DataType::Null => make_array_inner(&[Arc::clone(values)]),
460        DataType::List(_) => general_append_and_prepend::<i32>(args, true),
461        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, true),
462        arg_type => exec_err!("array_append does not support type {arg_type}"),
463    }
464}
465
466/// Array_prepend SQL function
467pub(crate) fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
468    let [values, array] = take_function_args("array_prepend", args)?;
469    match array.data_type() {
470        DataType::Null => make_array_inner(&[Arc::clone(values)]),
471        DataType::List(_) => general_append_and_prepend::<i32>(args, false),
472        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, false),
473        arg_type => exec_err!("array_prepend does not support type {arg_type}"),
474    }
475}
476
477fn general_append_and_prepend<O: OffsetSizeTrait>(
478    args: &[ArrayRef],
479    is_append: bool,
480) -> Result<ArrayRef>
481where
482    i64: TryInto<O>,
483{
484    let (list_array, element_array) = if is_append {
485        let list_array = as_generic_list_array::<O>(&args[0])?;
486        let element_array = &args[1];
487        check_datatypes("array_append", &[element_array, list_array.values()])?;
488        (list_array, element_array)
489    } else {
490        let list_array = as_generic_list_array::<O>(&args[1])?;
491        let element_array = &args[0];
492        check_datatypes("array_prepend", &[list_array.values(), element_array])?;
493        (list_array, element_array)
494    };
495
496    let res = match list_array.value_type() {
497        DataType::List(_) => concat_internal::<O>(args)?,
498        DataType::LargeList(_) => concat_internal::<O>(args)?,
499        data_type => {
500            return generic_append_and_prepend::<O>(
501                list_array,
502                element_array,
503                &data_type,
504                is_append,
505            );
506        }
507    };
508
509    Ok(res)
510}
511
512/// Appends or prepends elements to a ListArray.
513///
514/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean flag
515/// indicating whether to append or prepend the elements. It returns a `Result<ArrayRef>`
516/// representing the resulting ListArray after the operation.
517///
518/// # Arguments
519///
520/// * `list_array` - A reference to the ListArray to which elements will be appended/prepended.
521/// * `element_array` - A reference to the Array containing elements to be appended/prepended.
522/// * `field` - A reference to the Field describing the data type of the arrays.
523/// * `is_append` - A boolean flag indicating whether to append (`true`) or prepend (`false`) elements.
524///
525/// # Examples
526///
527/// generic_append_and_prepend(
528///     [1, 2, 3], 4, append => [1, 2, 3, 4]
529///     5, [6, 7, 8], prepend => [5, 6, 7, 8]
530/// )
531fn generic_append_and_prepend<O: OffsetSizeTrait>(
532    list_array: &GenericListArray<O>,
533    element_array: &ArrayRef,
534    data_type: &DataType,
535    is_append: bool,
536) -> Result<ArrayRef>
537where
538    i64: TryInto<O>,
539{
540    let mut offsets = vec![O::usize_as(0)];
541    let values = list_array.values();
542    let original_data = values.to_data();
543    let element_data = element_array.to_data();
544    let capacity = Capacities::Array(original_data.len() + element_data.len());
545
546    let mut mutable = MutableArrayData::with_capacities(
547        vec![&original_data, &element_data],
548        false,
549        capacity,
550    );
551
552    let values_index = 0;
553    let element_index = 1;
554
555    for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
556        let start = offset_window[0].to_usize().unwrap();
557        let end = offset_window[1].to_usize().unwrap();
558        if is_append {
559            mutable.extend(values_index, start, end);
560            mutable.extend(element_index, row_index, row_index + 1);
561        } else {
562            mutable.extend(element_index, row_index, row_index + 1);
563            mutable.extend(values_index, start, end);
564        }
565        offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
566    }
567
568    let data = mutable.freeze();
569
570    Ok(Arc::new(GenericListArray::<O>::try_new(
571        Arc::new(Field::new_list_field(data_type.to_owned(), true)),
572        OffsetBuffer::new(offsets.into()),
573        arrow::array::make_array(data),
574        None,
575    )?))
576}