datafusion_functions_nested/
concat.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for `array_append`, `array_prepend` and `array_concat` functions.
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::make_array::make_array_inner;
24use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
25use arrow::array::{
26    Array, ArrayRef, Capacities, GenericListArray, MutableArrayData, NullArray,
27    NullBufferBuilder, OffsetSizeTrait,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::{DataType, Field};
31use datafusion_common::utils::{
32    base_type, coerced_type_with_base_type_only, ListCoercion,
33};
34use datafusion_common::Result;
35use datafusion_common::{
36    cast::as_generic_list_array,
37    exec_err, plan_err,
38    utils::{list_ndims, take_function_args},
39};
40use datafusion_expr::binary::type_union_resolution;
41use datafusion_expr::{
42    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45
46make_udf_expr_and_func!(
47    ArrayAppend,
48    array_append,
49    array element,                                // arg name
50    "appends an element to the end of an array.", // doc
51    array_append_udf                              // internal function name
52);
53
54#[user_doc(
55    doc_section(label = "Array Functions"),
56    description = "Appends an element to the end of an array.",
57    syntax_example = "array_append(array, element)",
58    sql_example = r#"```sql
59> select array_append([1, 2, 3], 4);
60+--------------------------------------+
61| array_append(List([1,2,3]),Int64(4)) |
62+--------------------------------------+
63| [1, 2, 3, 4]                         |
64+--------------------------------------+
65```"#,
66    argument(
67        name = "array",
68        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
69    ),
70    argument(name = "element", description = "Element to append to the array.")
71)]
72#[derive(Debug)]
73pub struct ArrayAppend {
74    signature: Signature,
75    aliases: Vec<String>,
76}
77
78impl Default for ArrayAppend {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84impl ArrayAppend {
85    pub fn new() -> Self {
86        Self {
87            signature: Signature::array_and_element(Volatility::Immutable),
88            aliases: vec![
89                String::from("list_append"),
90                String::from("array_push_back"),
91                String::from("list_push_back"),
92            ],
93        }
94    }
95}
96
97impl ScalarUDFImpl for ArrayAppend {
98    fn as_any(&self) -> &dyn Any {
99        self
100    }
101
102    fn name(&self) -> &str {
103        "array_append"
104    }
105
106    fn signature(&self) -> &Signature {
107        &self.signature
108    }
109
110    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
111        let [array_type, element_type] = take_function_args(self.name(), arg_types)?;
112        if array_type.is_null() {
113            Ok(DataType::new_list(element_type.clone(), true))
114        } else {
115            Ok(array_type.clone())
116        }
117    }
118
119    fn invoke_with_args(
120        &self,
121        args: datafusion_expr::ScalarFunctionArgs,
122    ) -> Result<ColumnarValue> {
123        make_scalar_function(array_append_inner)(&args.args)
124    }
125
126    fn aliases(&self) -> &[String] {
127        &self.aliases
128    }
129
130    fn documentation(&self) -> Option<&Documentation> {
131        self.doc()
132    }
133}
134
135make_udf_expr_and_func!(
136    ArrayPrepend,
137    array_prepend,
138    element array,
139    "Prepends an element to the beginning of an array.",
140    array_prepend_udf
141);
142
143#[user_doc(
144    doc_section(label = "Array Functions"),
145    description = "Prepends an element to the beginning of an array.",
146    syntax_example = "array_prepend(element, array)",
147    sql_example = r#"```sql
148> select array_prepend(1, [2, 3, 4]);
149+---------------------------------------+
150| array_prepend(Int64(1),List([2,3,4])) |
151+---------------------------------------+
152| [1, 2, 3, 4]                          |
153+---------------------------------------+
154```"#,
155    argument(
156        name = "array",
157        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
158    ),
159    argument(name = "element", description = "Element to prepend to the array.")
160)]
161#[derive(Debug)]
162pub struct ArrayPrepend {
163    signature: Signature,
164    aliases: Vec<String>,
165}
166
167impl Default for ArrayPrepend {
168    fn default() -> Self {
169        Self::new()
170    }
171}
172
173impl ArrayPrepend {
174    pub fn new() -> Self {
175        Self {
176            signature: Signature::element_and_array(Volatility::Immutable),
177            aliases: vec![
178                String::from("list_prepend"),
179                String::from("array_push_front"),
180                String::from("list_push_front"),
181            ],
182        }
183    }
184}
185
186impl ScalarUDFImpl for ArrayPrepend {
187    fn as_any(&self) -> &dyn Any {
188        self
189    }
190
191    fn name(&self) -> &str {
192        "array_prepend"
193    }
194
195    fn signature(&self) -> &Signature {
196        &self.signature
197    }
198
199    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
200        let [element_type, array_type] = take_function_args(self.name(), arg_types)?;
201        if array_type.is_null() {
202            Ok(DataType::new_list(element_type.clone(), true))
203        } else {
204            Ok(array_type.clone())
205        }
206    }
207
208    fn invoke_with_args(
209        &self,
210        args: datafusion_expr::ScalarFunctionArgs,
211    ) -> Result<ColumnarValue> {
212        make_scalar_function(array_prepend_inner)(&args.args)
213    }
214
215    fn aliases(&self) -> &[String] {
216        &self.aliases
217    }
218
219    fn documentation(&self) -> Option<&Documentation> {
220        self.doc()
221    }
222}
223
224make_udf_expr_and_func!(
225    ArrayConcat,
226    array_concat,
227    "Concatenates arrays.",
228    array_concat_udf
229);
230
231#[user_doc(
232    doc_section(label = "Array Functions"),
233    description = "Concatenates arrays.",
234    syntax_example = "array_concat(array[, ..., array_n])",
235    sql_example = r#"```sql
236> select array_concat([1, 2], [3, 4], [5, 6]);
237+---------------------------------------------------+
238| array_concat(List([1,2]),List([3,4]),List([5,6])) |
239+---------------------------------------------------+
240| [1, 2, 3, 4, 5, 6]                                |
241+---------------------------------------------------+
242```"#,
243    argument(
244        name = "array",
245        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
246    ),
247    argument(
248        name = "array_n",
249        description = "Subsequent array column or literal array to concatenate."
250    )
251)]
252#[derive(Debug)]
253pub struct ArrayConcat {
254    signature: Signature,
255    aliases: Vec<String>,
256}
257
258impl Default for ArrayConcat {
259    fn default() -> Self {
260        Self::new()
261    }
262}
263
264impl ArrayConcat {
265    pub fn new() -> Self {
266        Self {
267            signature: Signature::user_defined(Volatility::Immutable),
268            aliases: vec![
269                String::from("array_cat"),
270                String::from("list_concat"),
271                String::from("list_cat"),
272            ],
273        }
274    }
275}
276
277impl ScalarUDFImpl for ArrayConcat {
278    fn as_any(&self) -> &dyn Any {
279        self
280    }
281
282    fn name(&self) -> &str {
283        "array_concat"
284    }
285
286    fn signature(&self) -> &Signature {
287        &self.signature
288    }
289
290    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
291        let mut max_dims = 0;
292        let mut large_list = false;
293        let mut element_types = Vec::with_capacity(arg_types.len());
294        for arg_type in arg_types {
295            match arg_type {
296                DataType::Null | DataType::List(_) | DataType::FixedSizeList(..) => (),
297                DataType::LargeList(_) => large_list = true,
298                arg_type => {
299                    return plan_err!("{} does not support type {arg_type}", self.name())
300                }
301            }
302
303            max_dims = max_dims.max(list_ndims(arg_type));
304            element_types.push(base_type(arg_type))
305        }
306
307        if max_dims == 0 {
308            Ok(DataType::Null)
309        } else if let Some(mut return_type) = type_union_resolution(&element_types) {
310            for _ in 1..max_dims {
311                return_type = DataType::new_list(return_type, true)
312            }
313
314            if large_list {
315                Ok(DataType::new_large_list(return_type, true))
316            } else {
317                Ok(DataType::new_list(return_type, true))
318            }
319        } else {
320            plan_err!(
321                "Failed to unify argument types of {}: {arg_types:?}",
322                self.name()
323            )
324        }
325    }
326
327    fn invoke_with_args(
328        &self,
329        args: datafusion_expr::ScalarFunctionArgs,
330    ) -> Result<ColumnarValue> {
331        make_scalar_function(array_concat_inner)(&args.args)
332    }
333
334    fn aliases(&self) -> &[String] {
335        &self.aliases
336    }
337
338    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
339        let base_type = base_type(&self.return_type(arg_types)?);
340        let coercion = Some(&ListCoercion::FixedSizedListToList);
341        let arg_types = arg_types.iter().map(|arg_type| {
342            coerced_type_with_base_type_only(arg_type, &base_type, coercion)
343        });
344
345        Ok(arg_types.collect())
346    }
347
348    fn documentation(&self) -> Option<&Documentation> {
349        self.doc()
350    }
351}
352
353/// Array_concat/Array_cat SQL function
354pub(crate) fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
355    if args.is_empty() {
356        return exec_err!("array_concat expects at least one argument");
357    }
358
359    let mut all_null = true;
360    let mut large_list = false;
361    for arg in args {
362        match arg.data_type() {
363            DataType::Null => continue,
364            DataType::LargeList(_) => large_list = true,
365            _ => (),
366        }
367
368        all_null = false
369    }
370
371    if all_null {
372        Ok(Arc::new(NullArray::new(args[0].len())))
373    } else if large_list {
374        concat_internal::<i64>(args)
375    } else {
376        concat_internal::<i32>(args)
377    }
378}
379
380fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
381    let args = align_array_dimensions::<O>(args.to_vec())?;
382
383    let list_arrays = args
384        .iter()
385        .map(|arg| as_generic_list_array::<O>(arg))
386        .collect::<Result<Vec<_>>>()?;
387    // Assume number of rows is the same for all arrays
388    let row_count = list_arrays[0].len();
389
390    let mut array_lengths = vec![];
391    let mut arrays = vec![];
392    let mut valid = NullBufferBuilder::new(row_count);
393    for i in 0..row_count {
394        let nulls = list_arrays
395            .iter()
396            .map(|arr| arr.is_null(i))
397            .collect::<Vec<_>>();
398
399        // If all the arrays are null, the concatenated array is null
400        let is_null = nulls.iter().all(|&x| x);
401        if is_null {
402            array_lengths.push(0);
403            valid.append_null();
404        } else {
405            // Get all the arrays on i-th row
406            let values = list_arrays
407                .iter()
408                .map(|arr| arr.value(i))
409                .collect::<Vec<_>>();
410
411            let elements = values
412                .iter()
413                .map(|a| a.as_ref())
414                .collect::<Vec<&dyn Array>>();
415
416            // Concatenated array on i-th row
417            let concatenated_array = arrow::compute::concat(elements.as_slice())?;
418            array_lengths.push(concatenated_array.len());
419            arrays.push(concatenated_array);
420            valid.append_non_null();
421        }
422    }
423    // Assume all arrays have the same data type
424    let data_type = list_arrays[0].value_type();
425
426    let elements = arrays
427        .iter()
428        .map(|a| a.as_ref())
429        .collect::<Vec<&dyn Array>>();
430
431    let list_arr = GenericListArray::<O>::new(
432        Arc::new(Field::new_list_field(data_type, true)),
433        OffsetBuffer::from_lengths(array_lengths),
434        Arc::new(arrow::compute::concat(elements.as_slice())?),
435        valid.finish(),
436    );
437
438    Ok(Arc::new(list_arr))
439}
440
441// Kernel functions
442
443/// Array_append SQL function
444pub(crate) fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
445    let [array, values] = take_function_args("array_append", args)?;
446    match array.data_type() {
447        DataType::Null => make_array_inner(&[Arc::clone(values)]),
448        DataType::List(_) => general_append_and_prepend::<i32>(args, true),
449        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, true),
450        arg_type => exec_err!("array_append does not support type {arg_type}"),
451    }
452}
453
454/// Array_prepend SQL function
455pub(crate) fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
456    let [values, array] = take_function_args("array_prepend", args)?;
457    match array.data_type() {
458        DataType::Null => make_array_inner(&[Arc::clone(values)]),
459        DataType::List(_) => general_append_and_prepend::<i32>(args, false),
460        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, false),
461        arg_type => exec_err!("array_prepend does not support type {arg_type}"),
462    }
463}
464
465fn general_append_and_prepend<O: OffsetSizeTrait>(
466    args: &[ArrayRef],
467    is_append: bool,
468) -> Result<ArrayRef>
469where
470    i64: TryInto<O>,
471{
472    let (list_array, element_array) = if is_append {
473        let list_array = as_generic_list_array::<O>(&args[0])?;
474        let element_array = &args[1];
475        check_datatypes("array_append", &[element_array, list_array.values()])?;
476        (list_array, element_array)
477    } else {
478        let list_array = as_generic_list_array::<O>(&args[1])?;
479        let element_array = &args[0];
480        check_datatypes("array_prepend", &[list_array.values(), element_array])?;
481        (list_array, element_array)
482    };
483
484    let res = match list_array.value_type() {
485        DataType::List(_) => concat_internal::<O>(args)?,
486        DataType::LargeList(_) => concat_internal::<O>(args)?,
487        data_type => {
488            return generic_append_and_prepend::<O>(
489                list_array,
490                element_array,
491                &data_type,
492                is_append,
493            );
494        }
495    };
496
497    Ok(res)
498}
499
500/// Appends or prepends elements to a ListArray.
501///
502/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean flag
503/// indicating whether to append or prepend the elements. It returns a `Result<ArrayRef>`
504/// representing the resulting ListArray after the operation.
505///
506/// # Arguments
507///
508/// * `list_array` - A reference to the ListArray to which elements will be appended/prepended.
509/// * `element_array` - A reference to the Array containing elements to be appended/prepended.
510/// * `field` - A reference to the Field describing the data type of the arrays.
511/// * `is_append` - A boolean flag indicating whether to append (`true`) or prepend (`false`) elements.
512///
513/// # Examples
514///
515/// generic_append_and_prepend(
516///     [1, 2, 3], 4, append => [1, 2, 3, 4]
517///     5, [6, 7, 8], prepend => [5, 6, 7, 8]
518/// )
519fn generic_append_and_prepend<O: OffsetSizeTrait>(
520    list_array: &GenericListArray<O>,
521    element_array: &ArrayRef,
522    data_type: &DataType,
523    is_append: bool,
524) -> Result<ArrayRef>
525where
526    i64: TryInto<O>,
527{
528    let mut offsets = vec![O::usize_as(0)];
529    let values = list_array.values();
530    let original_data = values.to_data();
531    let element_data = element_array.to_data();
532    let capacity = Capacities::Array(original_data.len() + element_data.len());
533
534    let mut mutable = MutableArrayData::with_capacities(
535        vec![&original_data, &element_data],
536        false,
537        capacity,
538    );
539
540    let values_index = 0;
541    let element_index = 1;
542
543    for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
544        let start = offset_window[0].to_usize().unwrap();
545        let end = offset_window[1].to_usize().unwrap();
546        if is_append {
547            mutable.extend(values_index, start, end);
548            mutable.extend(element_index, row_index, row_index + 1);
549        } else {
550            mutable.extend(element_index, row_index, row_index + 1);
551            mutable.extend(values_index, start, end);
552        }
553        offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
554    }
555
556    let data = mutable.freeze();
557
558    Ok(Arc::new(GenericListArray::<O>::try_new(
559        Arc::new(Field::new_list_field(data_type.to_owned(), true)),
560        OffsetBuffer::new(offsets.into()),
561        arrow::array::make_array(data),
562        None,
563    )?))
564}