Skip to main content

datafusion_functions_nested/
concat.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for `array_append`, `array_prepend` and `array_concat` functions.
19
20use std::sync::Arc;
21
22use crate::make_array::make_array_inner;
23use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function};
24use arrow::array::{
25    Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
26    OffsetSizeTrait,
27};
28use arrow::buffer::{NullBuffer, OffsetBuffer};
29use arrow::datatypes::{DataType, Field};
30use datafusion_common::Result;
31use datafusion_common::utils::{
32    ListCoercion, base_type, coerced_type_with_base_type_only,
33};
34use datafusion_common::{
35    cast::as_generic_list_array,
36    exec_err, plan_err,
37    utils::{list_ndims, take_function_args},
38};
39use datafusion_expr::binary::type_union_resolution;
40use datafusion_expr::{
41    ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
42    Volatility,
43};
44use datafusion_macros::user_doc;
45use itertools::Itertools;
46
47make_udf_expr_and_func!(
48    ArrayAppend,
49    array_append,
50    array element,                                // arg name
51    "appends an element to the end of an array.", // doc
52    array_append_udf                              // internal function name
53);
54
55#[user_doc(
56    doc_section(label = "Array Functions"),
57    description = "Appends an element to the end of an array.",
58    syntax_example = "array_append(array, element)",
59    sql_example = r#"```sql
60> select array_append([1, 2, 3], 4);
61+--------------------------------------+
62| array_append(List([1,2,3]),Int64(4)) |
63+--------------------------------------+
64| [1, 2, 3, 4]                         |
65+--------------------------------------+
66```"#,
67    argument(
68        name = "array",
69        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
70    ),
71    argument(name = "element", description = "Element to append to the array.")
72)]
73#[derive(Debug, PartialEq, Eq, Hash)]
74pub struct ArrayAppend {
75    signature: Signature,
76    aliases: Vec<String>,
77}
78
79impl Default for ArrayAppend {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl ArrayAppend {
86    pub fn new() -> Self {
87        Self {
88            signature: Signature::array_and_element(Volatility::Immutable),
89            aliases: vec![
90                String::from("list_append"),
91                String::from("array_push_back"),
92                String::from("list_push_back"),
93            ],
94        }
95    }
96}
97
98impl ScalarUDFImpl for ArrayAppend {
99    fn name(&self) -> &str {
100        "array_append"
101    }
102
103    fn signature(&self) -> &Signature {
104        &self.signature
105    }
106
107    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
108        let [array_type, element_type] = take_function_args(self.name(), arg_types)?;
109        if array_type.is_null() {
110            Ok(DataType::new_list(element_type.clone(), true))
111        } else {
112            Ok(array_type.clone())
113        }
114    }
115
116    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
117        make_scalar_function(array_append_inner)(&args.args)
118    }
119
120    fn aliases(&self) -> &[String] {
121        &self.aliases
122    }
123
124    fn documentation(&self) -> Option<&Documentation> {
125        self.doc()
126    }
127}
128
129make_udf_expr_and_func!(
130    ArrayPrepend,
131    array_prepend,
132    element array,
133    "Prepends an element to the beginning of an array.",
134    array_prepend_udf
135);
136
137#[user_doc(
138    doc_section(label = "Array Functions"),
139    description = "Prepends an element to the beginning of an array.",
140    syntax_example = "array_prepend(element, array)",
141    sql_example = r#"```sql
142> select array_prepend(1, [2, 3, 4]);
143+---------------------------------------+
144| array_prepend(Int64(1),List([2,3,4])) |
145+---------------------------------------+
146| [1, 2, 3, 4]                          |
147+---------------------------------------+
148```"#,
149    argument(
150        name = "array",
151        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
152    ),
153    argument(name = "element", description = "Element to prepend to the array.")
154)]
155#[derive(Debug, PartialEq, Eq, Hash)]
156pub struct ArrayPrepend {
157    signature: Signature,
158    aliases: Vec<String>,
159}
160
161impl Default for ArrayPrepend {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167impl ArrayPrepend {
168    pub fn new() -> Self {
169        Self {
170            signature: Signature::element_and_array(Volatility::Immutable),
171            aliases: vec![
172                String::from("list_prepend"),
173                String::from("array_push_front"),
174                String::from("list_push_front"),
175            ],
176        }
177    }
178}
179
180impl ScalarUDFImpl for ArrayPrepend {
181    fn name(&self) -> &str {
182        "array_prepend"
183    }
184
185    fn signature(&self) -> &Signature {
186        &self.signature
187    }
188
189    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
190        let [element_type, array_type] = take_function_args(self.name(), arg_types)?;
191        if array_type.is_null() {
192            Ok(DataType::new_list(element_type.clone(), true))
193        } else {
194            Ok(array_type.clone())
195        }
196    }
197
198    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
199        make_scalar_function(array_prepend_inner)(&args.args)
200    }
201
202    fn aliases(&self) -> &[String] {
203        &self.aliases
204    }
205
206    fn documentation(&self) -> Option<&Documentation> {
207        self.doc()
208    }
209}
210
211make_udf_expr_and_func!(
212    ArrayConcat,
213    array_concat,
214    "Concatenates arrays.",
215    array_concat_udf
216);
217
218#[user_doc(
219    doc_section(label = "Array Functions"),
220    description = "Concatenates arrays.",
221    syntax_example = "array_concat(array[, ..., array_n])",
222    sql_example = r#"```sql
223> select array_concat([1, 2], [3, 4], [5, 6]);
224+---------------------------------------------------+
225| array_concat(List([1,2]),List([3,4]),List([5,6])) |
226+---------------------------------------------------+
227| [1, 2, 3, 4, 5, 6]                                |
228+---------------------------------------------------+
229```"#,
230    argument(
231        name = "array",
232        description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
233    ),
234    argument(
235        name = "array_n",
236        description = "Subsequent array column or literal array to concatenate."
237    )
238)]
239#[derive(Debug, PartialEq, Eq, Hash)]
240pub struct ArrayConcat {
241    signature: Signature,
242    aliases: Vec<String>,
243}
244
245impl Default for ArrayConcat {
246    fn default() -> Self {
247        Self::new()
248    }
249}
250
251impl ArrayConcat {
252    pub fn new() -> Self {
253        Self {
254            signature: Signature::user_defined(Volatility::Immutable),
255            aliases: vec![
256                String::from("array_cat"),
257                String::from("list_concat"),
258                String::from("list_cat"),
259            ],
260        }
261    }
262}
263
264impl ScalarUDFImpl for ArrayConcat {
265    fn name(&self) -> &str {
266        "array_concat"
267    }
268
269    fn signature(&self) -> &Signature {
270        &self.signature
271    }
272
273    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
274        let mut max_dims = 0;
275        let mut large_list = false;
276        let mut element_types = Vec::with_capacity(arg_types.len());
277        for arg_type in arg_types {
278            match arg_type {
279                DataType::Null | DataType::List(_) | DataType::FixedSizeList(..) => (),
280                DataType::LargeList(_) => large_list = true,
281                arg_type => {
282                    return plan_err!("{} does not support type {arg_type}", self.name());
283                }
284            }
285
286            max_dims = max_dims.max(list_ndims(arg_type));
287            element_types.push(base_type(arg_type))
288        }
289
290        if max_dims == 0 {
291            Ok(DataType::Null)
292        } else if let Some(mut return_type) = type_union_resolution(&element_types) {
293            for _ in 1..max_dims {
294                return_type = DataType::new_list(return_type, true)
295            }
296
297            if large_list {
298                Ok(DataType::new_large_list(return_type, true))
299            } else {
300                Ok(DataType::new_list(return_type, true))
301            }
302        } else {
303            plan_err!(
304                "Failed to unify argument types of {}: [{}]",
305                self.name(),
306                arg_types.iter().join(", ")
307            )
308        }
309    }
310
311    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
312        make_scalar_function(array_concat_inner)(&args.args)
313    }
314
315    fn aliases(&self) -> &[String] {
316        &self.aliases
317    }
318
319    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
320        let return_type = self.return_type(arg_types)?;
321        let base_type = base_type(&return_type);
322        let coercion = Some(&ListCoercion::FixedSizedListToList);
323        // When the return type is a `LargeList`, the outer container of every
324        // input must be widened to `LargeList` as well. Otherwise
325        // `array_concat_inner` would later try to downcast a `List` argument
326        // to `GenericListArray<i64>` and fail.
327        let promote_to_large_list = matches!(return_type, DataType::LargeList(_));
328        let arg_types = arg_types.iter().map(|arg_type| {
329            let coerced =
330                coerced_type_with_base_type_only(arg_type, &base_type, coercion);
331            match coerced {
332                DataType::List(field) if promote_to_large_list => {
333                    DataType::LargeList(field)
334                }
335                other => other,
336            }
337        });
338
339        Ok(arg_types.collect())
340    }
341
342    fn documentation(&self) -> Option<&Documentation> {
343        self.doc()
344    }
345}
346
347pub fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
348    if args.is_empty() {
349        return exec_err!("array_concat expects at least one argument");
350    }
351
352    let mut all_null = true;
353    let mut large_list = false;
354    for arg in args {
355        match arg.data_type() {
356            DataType::Null => continue,
357            DataType::LargeList(_) => large_list = true,
358            _ => (),
359        }
360        if arg.null_count() < arg.len() {
361            all_null = false;
362        }
363    }
364
365    if all_null {
366        // Return a null array with the same type as the first non-null-type argument
367        let return_type = args
368            .iter()
369            .map(|arg| arg.data_type())
370            .find_or_first(|d| !d.is_null())
371            .unwrap(); // Safe because args is non-empty
372
373        Ok(arrow::array::make_array(ArrayData::new_null(
374            return_type,
375            args[0].len(),
376        )))
377    } else if large_list {
378        concat_internal::<i64>(args)
379    } else {
380        concat_internal::<i32>(args)
381    }
382}
383
384fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
385    let args = align_array_dimensions::<O>(args.to_vec())?;
386
387    let list_arrays = args
388        .iter()
389        .map(|arg| as_generic_list_array::<O>(arg))
390        .collect::<Result<Vec<_>>>()?;
391    let row_count = list_arrays[0].len();
392
393    // Extract underlying values ArrayData from each list array for MutableArrayData.
394    let values_data: Vec<ArrayData> =
395        list_arrays.iter().map(|la| la.values().to_data()).collect();
396    let values_data_refs: Vec<&ArrayData> = values_data.iter().collect();
397
398    // Estimate capacity as the sum of all values arrays' lengths.
399    let total_capacity: usize = values_data.iter().map(|d| d.len()).sum();
400
401    let mut mutable = MutableArrayData::with_capacities(
402        values_data_refs,
403        false,
404        Capacities::Array(total_capacity),
405    );
406    let mut offsets: Vec<O> = Vec::with_capacity(row_count + 1);
407    offsets.push(O::zero());
408
409    // Compute the output null buffer: a row is null only if null in ALL input
410    // arrays. This is the bitwise OR of validity bits (valid if valid in ANY
411    // input). If any array has no null buffer (all valid), no output row can be
412    // null.
413    let nulls = list_arrays
414        .iter()
415        .filter_map(|la| la.nulls())
416        .collect::<Vec<_>>();
417    let valid = if nulls.len() == list_arrays.len() {
418        nulls
419            .iter()
420            .map(|n| n.inner().clone())
421            .reduce(|a, b| &a | &b)
422            .map(NullBuffer::new)
423    } else {
424        None
425    };
426
427    for row_idx in 0..row_count {
428        for (arr_idx, list_array) in list_arrays.iter().enumerate() {
429            if list_array.is_null(row_idx) {
430                continue;
431            }
432            let start = list_array.offsets()[row_idx].to_usize().unwrap();
433            let end = list_array.offsets()[row_idx + 1].to_usize().unwrap();
434            if start < end {
435                mutable.extend(arr_idx, start, end);
436            }
437        }
438        offsets.push(O::usize_as(mutable.len()));
439    }
440
441    let data_type = list_arrays[0].value_type();
442    let data = mutable.freeze();
443
444    Ok(Arc::new(GenericListArray::<O>::try_new(
445        Arc::new(Field::new_list_field(data_type, true)),
446        OffsetBuffer::new(offsets.into()),
447        arrow::array::make_array(data),
448        valid,
449    )?))
450}
451
452// Kernel functions
453
454fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
455    let [array, values] = take_function_args("array_append", args)?;
456    match array.data_type() {
457        DataType::Null => make_array_inner(&[Arc::clone(values)]),
458        DataType::List(_) => general_append_and_prepend::<i32>(args, true),
459        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, true),
460        arg_type => exec_err!("array_append does not support type {arg_type}"),
461    }
462}
463
464fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
465    let [values, array] = take_function_args("array_prepend", args)?;
466    match array.data_type() {
467        DataType::Null => make_array_inner(&[Arc::clone(values)]),
468        DataType::List(_) => general_append_and_prepend::<i32>(args, false),
469        DataType::LargeList(_) => general_append_and_prepend::<i64>(args, false),
470        arg_type => exec_err!("array_prepend does not support type {arg_type}"),
471    }
472}
473
474fn general_append_and_prepend<O: OffsetSizeTrait>(
475    args: &[ArrayRef],
476    is_append: bool,
477) -> Result<ArrayRef>
478where
479    i64: TryInto<O>,
480{
481    let (list_array, element_array) = if is_append {
482        let list_array = as_generic_list_array::<O>(&args[0])?;
483        let element_array = &args[1];
484        check_datatypes("array_append", &[element_array, list_array.values()])?;
485        (list_array, element_array)
486    } else {
487        let list_array = as_generic_list_array::<O>(&args[1])?;
488        let element_array = &args[0];
489        check_datatypes("array_prepend", &[list_array.values(), element_array])?;
490        (list_array, element_array)
491    };
492
493    let res = match list_array.value_type() {
494        DataType::List(_) => concat_internal::<O>(args)?,
495        DataType::LargeList(_) => concat_internal::<O>(args)?,
496        data_type => {
497            return generic_append_and_prepend::<O>(
498                list_array,
499                element_array,
500                &data_type,
501                is_append,
502            );
503        }
504    };
505
506    Ok(res)
507}
508
509/// Appends or prepends elements to a ListArray.
510///
511/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean flag
512/// indicating whether to append or prepend the elements. It returns a `Result<ArrayRef>`
513/// representing the resulting ListArray after the operation.
514///
515/// # Arguments
516///
517/// * `list_array` - A reference to the ListArray to which elements will be appended/prepended.
518/// * `element_array` - A reference to the Array containing elements to be appended/prepended.
519/// * `field` - A reference to the Field describing the data type of the arrays.
520/// * `is_append` - A boolean flag indicating whether to append (`true`) or prepend (`false`) elements.
521///
522/// # Examples
523///
524/// generic_append_and_prepend(
525///     [1, 2, 3], 4, append => [1, 2, 3, 4]
526///     5, [6, 7, 8], prepend => [5, 6, 7, 8]
527/// )
528fn generic_append_and_prepend<O: OffsetSizeTrait>(
529    list_array: &GenericListArray<O>,
530    element_array: &ArrayRef,
531    data_type: &DataType,
532    is_append: bool,
533) -> Result<ArrayRef>
534where
535    i64: TryInto<O>,
536{
537    let mut offsets = vec![O::usize_as(0)];
538    let values = list_array.values();
539    let original_data = values.to_data();
540    let element_data = element_array.to_data();
541    let capacity = Capacities::Array(original_data.len() + element_data.len());
542
543    let mut mutable = MutableArrayData::with_capacities(
544        vec![&original_data, &element_data],
545        false,
546        capacity,
547    );
548
549    let values_index = 0;
550    let element_index = 1;
551
552    for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
553        let start = offset_window[0].to_usize().unwrap();
554        let end = offset_window[1].to_usize().unwrap();
555        if is_append {
556            mutable.extend(values_index, start, end);
557            mutable.extend(element_index, row_index, row_index + 1);
558        } else {
559            mutable.extend(element_index, row_index, row_index + 1);
560            mutable.extend(values_index, start, end);
561        }
562        offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
563    }
564
565    let data = mutable.freeze();
566
567    Ok(Arc::new(GenericListArray::<O>::try_new(
568        Arc::new(Field::new_list_field(data_type.to_owned(), true)),
569        OffsetBuffer::new(offsets.into()),
570        arrow::array::make_array(data),
571        None,
572    )?))
573}