datafusion_functions_nested/
make_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for `make_array` function.
19
20use std::any::Any;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_scalar_function;
25use arrow::array::{
26    Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
27    NullArray, OffsetSizeTrait, new_null_array,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::DataType;
31use arrow::datatypes::{DataType::Null, Field};
32use datafusion_common::utils::SingleRowListArrayBuilder;
33use datafusion_common::{Result, plan_err};
34use datafusion_expr::TypeSignature;
35use datafusion_expr::binary::{
36    try_type_union_resolution_with_struct, type_union_resolution,
37};
38use datafusion_expr::{
39    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
40};
41use datafusion_macros::user_doc;
42use itertools::Itertools as _;
43
44make_udf_expr_and_func!(
45    MakeArray,
46    make_array,
47    "Returns an Arrow array using the specified input expressions.",
48    make_array_udf
49);
50
51#[user_doc(
52    doc_section(label = "Array Functions"),
53    description = "Returns an array using the specified input expressions.",
54    syntax_example = "make_array(expression1[, ..., expression_n])",
55    sql_example = r#"```sql
56> select make_array(1, 2, 3, 4, 5);
57+----------------------------------------------------------+
58| make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)) |
59+----------------------------------------------------------+
60| [1, 2, 3, 4, 5]                                          |
61+----------------------------------------------------------+
62```"#,
63    argument(
64        name = "expression_n",
65        description = "Expression to include in the output array. Can be a constant, column, or function, and any combination of arithmetic or string operators."
66    )
67)]
68#[derive(Debug, PartialEq, Eq, Hash)]
69pub struct MakeArray {
70    signature: Signature,
71    aliases: Vec<String>,
72}
73
74impl Default for MakeArray {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80impl MakeArray {
81    pub fn new() -> Self {
82        Self {
83            signature: Signature::one_of(
84                vec![TypeSignature::Nullary, TypeSignature::UserDefined],
85                Volatility::Immutable,
86            ),
87            aliases: vec![String::from("make_list")],
88        }
89    }
90}
91
92impl ScalarUDFImpl for MakeArray {
93    fn as_any(&self) -> &dyn Any {
94        self
95    }
96
97    fn name(&self) -> &str {
98        "make_array"
99    }
100
101    fn signature(&self) -> &Signature {
102        &self.signature
103    }
104
105    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
106        let element_type = if arg_types.is_empty() {
107            Null
108        } else {
109            // At this point, all the type in array should be coerced to the same one.
110            arg_types[0].to_owned()
111        };
112
113        Ok(DataType::new_list(element_type, true))
114    }
115
116    fn invoke_with_args(
117        &self,
118        args: datafusion_expr::ScalarFunctionArgs,
119    ) -> Result<ColumnarValue> {
120        make_scalar_function(make_array_inner)(&args.args)
121    }
122
123    fn aliases(&self) -> &[String] {
124        &self.aliases
125    }
126
127    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
128        coerce_types_inner(arg_types, self.name())
129    }
130
131    fn documentation(&self) -> Option<&Documentation> {
132        self.doc()
133    }
134}
135
136/// `make_array_inner` is the implementation of the `make_array` function.
137/// Constructs an array using the input `data` as `ArrayRef`.
138/// Returns a reference-counted `Array` instance result.
139pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
140    let data_type = arrays.iter().find_map(|arg| {
141        let arg_type = arg.data_type();
142        (!arg_type.is_null()).then_some(arg_type)
143    });
144
145    let data_type = data_type.unwrap_or(&Null);
146    if data_type.is_null() {
147        // Either an empty array or all nulls:
148        let length = arrays.iter().map(|a| a.len()).sum();
149        let array = new_null_array(&Null, length);
150        Ok(Arc::new(
151            SingleRowListArrayBuilder::new(array).build_list_array(),
152        ))
153    } else {
154        array_array::<i32>(arrays, data_type.clone(), Field::LIST_FIELD_DEFAULT_NAME)
155    }
156}
157
158/// Convert one or more [`ArrayRef`] of the same type into a
159/// `ListArray` or 'LargeListArray' depending on the offset size.
160///
161/// # Example (non nested)
162///
163/// Calling `array(col1, col2)` where col1 and col2 are non nested
164/// would return a single new `ListArray`, where each row was a list
165/// of 2 elements:
166///
167/// ```text
168/// ┌─────────┐   ┌─────────┐           ┌──────────────┐
169/// │ ┌─────┐ │   │ ┌─────┐ │           │ ┌──────────┐ │
170/// │ │  A  │ │   │ │  X  │ │           │ │  [A, X]  │ │
171/// │ ├─────┤ │   │ ├─────┤ │           │ ├──────────┤ │
172/// │ │NULL │ │   │ │  Y  │ │──────────▶│ │[NULL, Y] │ │
173/// │ ├─────┤ │   │ ├─────┤ │           │ ├──────────┤ │
174/// │ │  C  │ │   │ │  Z  │ │           │ │  [C, Z]  │ │
175/// │ └─────┘ │   │ └─────┘ │           │ └──────────┘ │
176/// └─────────┘   └─────────┘           └──────────────┘
177///   col1           col2                    output
178/// ```
179///
180/// # Example (nested)
181///
182/// Calling `array(col1, col2)` where col1 and col2 are lists
183/// would return a single new `ListArray`, where each row was a list
184/// of the corresponding elements of col1 and col2.
185///
186/// ``` text
187/// ┌──────────────┐   ┌──────────────┐        ┌─────────────────────────────┐
188/// │ ┌──────────┐ │   │ ┌──────────┐ │        │ ┌────────────────────────┐  │
189/// │ │  [A, X]  │ │   │ │    []    │ │        │ │    [[A, X], []]        │  │
190/// │ ├──────────┤ │   │ ├──────────┤ │        │ ├────────────────────────┤  │
191/// │ │[NULL, Y] │ │   │ │[Q, R, S] │ │───────▶│ │ [[NULL, Y], [Q, R, S]] │  │
192/// │ ├──────────┤ │   │ ├──────────┤ │        │ ├────────────────────────│  │
193/// │ │  [C, Z]  │ │   │ │   NULL   │ │        │ │    [[C, Z], NULL]      │  │
194/// │ └──────────┘ │   │ └──────────┘ │        │ └────────────────────────┘  │
195/// └──────────────┘   └──────────────┘        └─────────────────────────────┘
196///      col1               col2                         output
197/// ```
198pub fn array_array<O: OffsetSizeTrait>(
199    args: &[ArrayRef],
200    data_type: DataType,
201    field_name: &str,
202) -> Result<ArrayRef> {
203    // do not accept 0 arguments.
204    if args.is_empty() {
205        return plan_err!("Array requires at least one argument");
206    }
207
208    let mut data = vec![];
209    let mut total_len = 0;
210    for arg in args {
211        let arg_data = if arg.as_any().is::<NullArray>() {
212            ArrayData::new_empty(&data_type)
213        } else {
214            arg.to_data()
215        };
216        total_len += arg_data.len();
217        data.push(arg_data);
218    }
219
220    let mut offsets: Vec<O> = Vec::with_capacity(total_len);
221    offsets.push(O::usize_as(0));
222
223    let capacity = Capacities::Array(total_len);
224    let data_ref = data.iter().collect::<Vec<_>>();
225    let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);
226
227    let num_rows = args[0].len();
228    for row_idx in 0..num_rows {
229        for (arr_idx, arg) in args.iter().enumerate() {
230            if !arg.as_any().is::<NullArray>()
231                && !arg.is_null(row_idx)
232                && arg.is_valid(row_idx)
233            {
234                mutable.extend(arr_idx, row_idx, row_idx + 1);
235            } else {
236                mutable.extend_nulls(1);
237            }
238        }
239        offsets.push(O::usize_as(mutable.len()));
240    }
241    let data = mutable.freeze();
242
243    Ok(Arc::new(GenericListArray::<O>::try_new(
244        Arc::new(Field::new(field_name, data_type, true)),
245        OffsetBuffer::new(offsets.into()),
246        arrow::array::make_array(data),
247        None,
248    )?))
249}
250
251pub fn coerce_types_inner(arg_types: &[DataType], name: &str) -> Result<Vec<DataType>> {
252    if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
253        return Ok(unified);
254    }
255
256    if let Some(unified) = type_union_resolution(arg_types) {
257        Ok(vec![unified; arg_types.len()])
258    } else {
259        plan_err!(
260            "Failed to unify argument types of {}: [{}]",
261            name,
262            arg_types.iter().join(", ")
263        )
264    }
265}