datafusion_functions_nested/
concat.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for `array_append`, `array_prepend` and `array_concat` functions.
19
20use std::sync::Arc;
21use std::{any::Any, cmp::Ordering};
22
23use arrow::array::{
24    Array, ArrayRef, Capacities, GenericListArray, MutableArrayData, NullBufferBuilder,
25    OffsetSizeTrait,
26};
27use arrow::buffer::OffsetBuffer;
28use arrow::datatypes::{DataType, Field};
29use datafusion_common::utils::ListCoercion;
30use datafusion_common::Result;
31use datafusion_common::{
32    cast::as_generic_list_array,
33    exec_err, not_impl_err, plan_err,
34    utils::{list_ndims, take_function_args},
35};
36use datafusion_expr::{
37    ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, Documentation,
38    ScalarUDFImpl, Signature, TypeSignature, Volatility,
39};
40use datafusion_macros::user_doc;
41
42use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
43
44make_udf_expr_and_func!(
45    ArrayAppend,
46    array_append,
47    array element,                                // arg name
48    "appends an element to the end of an array.", // doc
49    array_append_udf                              // internal function name
50);
51
52#[user_doc(
53    doc_section(label = "Array Functions"),
54    description = "Appends an element to the end of an array.",
55    syntax_example = "array_append(array, element)",
56    sql_example = r#"```sql
57> select array_append([1, 2, 3], 4);
58+--------------------------------------+
59| array_append(List([1,2,3]),Int64(4)) |
60+--------------------------------------+
61| [1, 2, 3, 4]                         |
62+--------------------------------------+
63```"#,
64    argument(
65        name = "array",
66        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
67    ),
68    argument(name = "element", description = "Element to append to the array.")
69)]
70#[derive(Debug)]
71pub struct ArrayAppend {
72    signature: Signature,
73    aliases: Vec<String>,
74}
75
76impl Default for ArrayAppend {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82impl ArrayAppend {
83    pub fn new() -> Self {
84        Self {
85            signature: Signature::array_and_element(Volatility::Immutable),
86            aliases: vec![
87                String::from("list_append"),
88                String::from("array_push_back"),
89                String::from("list_push_back"),
90            ],
91        }
92    }
93}
94
95impl ScalarUDFImpl for ArrayAppend {
96    fn as_any(&self) -> &dyn Any {
97        self
98    }
99
100    fn name(&self) -> &str {
101        "array_append"
102    }
103
104    fn signature(&self) -> &Signature {
105        &self.signature
106    }
107
108    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
109        Ok(arg_types[0].clone())
110    }
111
112    fn invoke_with_args(
113        &self,
114        args: datafusion_expr::ScalarFunctionArgs,
115    ) -> Result<ColumnarValue> {
116        make_scalar_function(array_append_inner)(&args.args)
117    }
118
119    fn aliases(&self) -> &[String] {
120        &self.aliases
121    }
122
123    fn documentation(&self) -> Option<&Documentation> {
124        self.doc()
125    }
126}
127
128make_udf_expr_and_func!(
129    ArrayPrepend,
130    array_prepend,
131    element array,
132    "Prepends an element to the beginning of an array.",
133    array_prepend_udf
134);
135
136#[user_doc(
137    doc_section(label = "Array Functions"),
138    description = "Prepends an element to the beginning of an array.",
139    syntax_example = "array_prepend(element, array)",
140    sql_example = r#"```sql
141> select array_prepend(1, [2, 3, 4]);
142+---------------------------------------+
143| array_prepend(Int64(1),List([2,3,4])) |
144+---------------------------------------+
145| [1, 2, 3, 4]                          |
146+---------------------------------------+
147```"#,
148    argument(
149        name = "array",
150        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
151    ),
152    argument(name = "element", description = "Element to prepend to the array.")
153)]
154#[derive(Debug)]
155pub struct ArrayPrepend {
156    signature: Signature,
157    aliases: Vec<String>,
158}
159
160impl Default for ArrayPrepend {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166impl ArrayPrepend {
167    pub fn new() -> Self {
168        Self {
169            signature: Signature {
170                type_signature: TypeSignature::ArraySignature(
171                    ArrayFunctionSignature::Array {
172                        arguments: vec![
173                            ArrayFunctionArgument::Element,
174                            ArrayFunctionArgument::Array,
175                        ],
176                        array_coercion: Some(ListCoercion::FixedSizedListToList),
177                    },
178                ),
179                volatility: Volatility::Immutable,
180            },
181            aliases: vec![
182                String::from("list_prepend"),
183                String::from("array_push_front"),
184                String::from("list_push_front"),
185            ],
186        }
187    }
188}
189
190impl ScalarUDFImpl for ArrayPrepend {
191    fn as_any(&self) -> &dyn Any {
192        self
193    }
194
195    fn name(&self) -> &str {
196        "array_prepend"
197    }
198
199    fn signature(&self) -> &Signature {
200        &self.signature
201    }
202
203    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
204        Ok(arg_types[1].clone())
205    }
206
207    fn invoke_with_args(
208        &self,
209        args: datafusion_expr::ScalarFunctionArgs,
210    ) -> Result<ColumnarValue> {
211        make_scalar_function(array_prepend_inner)(&args.args)
212    }
213
214    fn aliases(&self) -> &[String] {
215        &self.aliases
216    }
217
218    fn documentation(&self) -> Option<&Documentation> {
219        self.doc()
220    }
221}
222
223make_udf_expr_and_func!(
224    ArrayConcat,
225    array_concat,
226    "Concatenates arrays.",
227    array_concat_udf
228);
229
230#[user_doc(
231    doc_section(label = "Array Functions"),
232    description = "Concatenates arrays.",
233    syntax_example = "array_concat(array[, ..., array_n])",
234    sql_example = r#"```sql
235> select array_concat([1, 2], [3, 4], [5, 6]);
236+---------------------------------------------------+
237| array_concat(List([1,2]),List([3,4]),List([5,6])) |
238+---------------------------------------------------+
239| [1, 2, 3, 4, 5, 6]                                |
240+---------------------------------------------------+
241```"#,
242    argument(
243        name = "array",
244        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
245    ),
246    argument(
247        name = "array_n",
248        description = "Subsequent array column or literal array to concatenate."
249    )
250)]
251#[derive(Debug)]
252pub struct ArrayConcat {
253    signature: Signature,
254    aliases: Vec<String>,
255}
256
257impl Default for ArrayConcat {
258    fn default() -> Self {
259        Self::new()
260    }
261}
262
263impl ArrayConcat {
264    pub fn new() -> Self {
265        Self {
266            signature: Signature::variadic_any(Volatility::Immutable),
267            aliases: vec![
268                String::from("array_cat"),
269                String::from("list_concat"),
270                String::from("list_cat"),
271            ],
272        }
273    }
274}
275
276impl ScalarUDFImpl for ArrayConcat {
277    fn as_any(&self) -> &dyn Any {
278        self
279    }
280
281    fn name(&self) -> &str {
282        "array_concat"
283    }
284
285    fn signature(&self) -> &Signature {
286        &self.signature
287    }
288
289    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
290        let mut expr_type = DataType::Null;
291        let mut max_dims = 0;
292        for arg_type in arg_types {
293            let DataType::List(field) = arg_type else {
294                return plan_err!(
295                    "The array_concat function can only accept list as the args."
296                );
297            };
298            if !field.data_type().equals_datatype(&DataType::Null) {
299                let dims = list_ndims(arg_type);
300                expr_type = match max_dims.cmp(&dims) {
301                    Ordering::Greater => expr_type,
302                    Ordering::Equal => {
303                        if expr_type == DataType::Null {
304                            arg_type.clone()
305                        } else if !expr_type.equals_datatype(arg_type) {
306                            return plan_err!(
307                            "It is not possible to concatenate arrays of different types. Expected: {}, got: {}", expr_type, arg_type
308                                );
309                        } else {
310                            expr_type
311                        }
312                    }
313
314                    Ordering::Less => {
315                        max_dims = dims;
316                        arg_type.clone()
317                    }
318                };
319            }
320        }
321
322        Ok(expr_type)
323    }
324
325    fn invoke_with_args(
326        &self,
327        args: datafusion_expr::ScalarFunctionArgs,
328    ) -> Result<ColumnarValue> {
329        make_scalar_function(array_concat_inner)(&args.args)
330    }
331
332    fn aliases(&self) -> &[String] {
333        &self.aliases
334    }
335
336    fn documentation(&self) -> Option<&Documentation> {
337        self.doc()
338    }
339}
340
341/// Array_concat/Array_cat SQL function
342pub(crate) fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
343    if args.is_empty() {
344        return exec_err!("array_concat expects at least one arguments");
345    }
346
347    let mut new_args = vec![];
348    for arg in args {
349        let ndim = list_ndims(arg.data_type());
350        let base_type = datafusion_common::utils::base_type(arg.data_type());
351        if ndim == 0 {
352            return not_impl_err!("Array is not type '{base_type:?}'.");
353        }
354        if !base_type.eq(&DataType::Null) {
355            new_args.push(Arc::clone(arg));
356        }
357    }
358
359    match &args[0].data_type() {
360        DataType::LargeList(_) => concat_internal::<i64>(new_args.as_slice()),
361        _ => concat_internal::<i32>(new_args.as_slice()),
362    }
363}
364
365fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
366    let args = align_array_dimensions::<O>(args.to_vec())?;
367
368    let list_arrays = args
369        .iter()
370        .map(|arg| as_generic_list_array::<O>(arg))
371        .collect::<Result<Vec<_>>>()?;
372    // Assume number of rows is the same for all arrays
373    let row_count = list_arrays[0].len();
374
375    let mut array_lengths = vec![];
376    let mut arrays = vec![];
377    let mut valid = NullBufferBuilder::new(row_count);
378    for i in 0..row_count {
379        let nulls = list_arrays
380            .iter()
381            .map(|arr| arr.is_null(i))
382            .collect::<Vec<_>>();
383
384        // If all the arrays are null, the concatenated array is null
385        let is_null = nulls.iter().all(|&x| x);
386        if is_null {
387            array_lengths.push(0);
388            valid.append_null();
389        } else {
390            // Get all the arrays on i-th row
391            let values = list_arrays
392                .iter()
393                .map(|arr| arr.value(i))
394                .collect::<Vec<_>>();
395
396            let elements = values
397                .iter()
398                .map(|a| a.as_ref())
399                .collect::<Vec<&dyn Array>>();
400
401            // Concatenated array on i-th row
402            let concatenated_array = arrow::compute::concat(elements.as_slice())?;
403            array_lengths.push(concatenated_array.len());
404            arrays.push(concatenated_array);
405            valid.append_non_null();
406        }
407    }
408    // Assume all arrays have the same data type
409    let data_type = list_arrays[0].value_type();
410
411    let elements = arrays
412        .iter()
413        .map(|a| a.as_ref())
414        .collect::<Vec<&dyn Array>>();
415
416    let list_arr = GenericListArray::<O>::new(
417        Arc::new(Field::new_list_field(data_type, true)),
418        OffsetBuffer::from_lengths(array_lengths),
419        Arc::new(arrow::compute::concat(elements.as_slice())?),
420        valid.finish(),
421    );
422
423    Ok(Arc::new(list_arr))
424}
425
426// Kernel functions
427
428/// Array_append SQL function
429pub(crate) fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
430    let [array, _] = take_function_args("array_append", args)?;
431
432    match array.data_type() {
433        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, true),
434        _ => general_append_and_prepend::<i32>(args, true),
435    }
436}
437
438/// Array_prepend SQL function
439pub(crate) fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
440    let [_, array] = take_function_args("array_prepend", args)?;
441
442    match array.data_type() {
443        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, false),
444        _ => general_append_and_prepend::<i32>(args, false),
445    }
446}
447
448fn general_append_and_prepend<O: OffsetSizeTrait>(
449    args: &[ArrayRef],
450    is_append: bool,
451) -> Result<ArrayRef>
452where
453    i64: TryInto<O>,
454{
455    let (list_array, element_array) = if is_append {
456        let list_array = as_generic_list_array::<O>(&args[0])?;
457        let element_array = &args[1];
458        check_datatypes("array_append", &[element_array, list_array.values()])?;
459        (list_array, element_array)
460    } else {
461        let list_array = as_generic_list_array::<O>(&args[1])?;
462        let element_array = &args[0];
463        check_datatypes("array_prepend", &[list_array.values(), element_array])?;
464        (list_array, element_array)
465    };
466
467    let res = match list_array.value_type() {
468        DataType::List(_) => concat_internal::<O>(args)?,
469        DataType::LargeList(_) => concat_internal::<O>(args)?,
470        data_type => {
471            return generic_append_and_prepend::<O>(
472                list_array,
473                element_array,
474                &data_type,
475                is_append,
476            );
477        }
478    };
479
480    Ok(res)
481}
482
483/// Appends or prepends elements to a ListArray.
484///
485/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean flag
486/// indicating whether to append or prepend the elements. It returns a `Result<ArrayRef>`
487/// representing the resulting ListArray after the operation.
488///
489/// # Arguments
490///
491/// * `list_array` - A reference to the ListArray to which elements will be appended/prepended.
492/// * `element_array` - A reference to the Array containing elements to be appended/prepended.
493/// * `field` - A reference to the Field describing the data type of the arrays.
494/// * `is_append` - A boolean flag indicating whether to append (`true`) or prepend (`false`) elements.
495///
496/// # Examples
497///
498/// generic_append_and_prepend(
499///     [1, 2, 3], 4, append => [1, 2, 3, 4]
500///     5, [6, 7, 8], prepend => [5, 6, 7, 8]
501/// )
502fn generic_append_and_prepend<O: OffsetSizeTrait>(
503    list_array: &GenericListArray<O>,
504    element_array: &ArrayRef,
505    data_type: &DataType,
506    is_append: bool,
507) -> Result<ArrayRef>
508where
509    i64: TryInto<O>,
510{
511    let mut offsets = vec![O::usize_as(0)];
512    let values = list_array.values();
513    let original_data = values.to_data();
514    let element_data = element_array.to_data();
515    let capacity = Capacities::Array(original_data.len() + element_data.len());
516
517    let mut mutable = MutableArrayData::with_capacities(
518        vec![&original_data, &element_data],
519        false,
520        capacity,
521    );
522
523    let values_index = 0;
524    let element_index = 1;
525
526    for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
527        let start = offset_window[0].to_usize().unwrap();
528        let end = offset_window[1].to_usize().unwrap();
529        if is_append {
530            mutable.extend(values_index, start, end);
531            mutable.extend(element_index, row_index, row_index + 1);
532        } else {
533            mutable.extend(element_index, row_index, row_index + 1);
534            mutable.extend(values_index, start, end);
535        }
536        offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
537    }
538
539    let data = mutable.freeze();
540
541    Ok(Arc::new(GenericListArray::<O>::try_new(
542        Arc::new(Field::new_list_field(data_type.to_owned(), true)),
543        OffsetBuffer::new(offsets.into()),
544        arrow::array::make_array(data),
545        None,
546    )?))
547}