Skip to main content

datafusion_functions_nested/
make_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for `make_array` function.
19
20use std::sync::Arc;
21use std::vec;
22
23use crate::utils::make_scalar_function;
24use arrow::array::{
25    Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
26    NullArray, OffsetSizeTrait, new_null_array,
27};
28use arrow::buffer::OffsetBuffer;
29use arrow::datatypes::DataType;
30use arrow::datatypes::{DataType::Null, Field};
31use datafusion_common::utils::SingleRowListArrayBuilder;
32use datafusion_common::{Result, plan_err};
33use datafusion_expr::binary::{
34    try_type_union_resolution_with_struct, type_union_resolution,
35};
36use datafusion_expr::{
37    ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
38    Volatility,
39};
40use datafusion_macros::user_doc;
41use itertools::Itertools as _;
42
43make_udf_expr_and_func!(
44    MakeArray,
45    make_array,
46    "Returns an Arrow array using the specified input expressions.",
47    make_array_udf
48);
49
50#[user_doc(
51    doc_section(label = "Array Functions"),
52    description = "Returns an array using the specified input expressions.",
53    syntax_example = "make_array(expression1[, ..., expression_n])",
54    sql_example = r#"```sql
55> select make_array(1, 2, 3, 4, 5);
56+----------------------------------------------------------+
57| make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)) |
58+----------------------------------------------------------+
59| [1, 2, 3, 4, 5]                                          |
60+----------------------------------------------------------+
61```"#,
62    argument(
63        name = "expression_n",
64        description = "Expression to include in the output array. Can be a constant, column, or function, and any combination of arithmetic or string operators."
65    )
66)]
67#[derive(Debug, PartialEq, Eq, Hash)]
68pub struct MakeArray {
69    signature: Signature,
70    aliases: Vec<String>,
71}
72
73impl Default for MakeArray {
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79impl MakeArray {
80    pub fn new() -> Self {
81        Self {
82            signature: Signature::user_defined(Volatility::Immutable),
83            aliases: vec![String::from("make_list")],
84        }
85    }
86}
87
88impl ScalarUDFImpl for MakeArray {
89    fn name(&self) -> &str {
90        "make_array"
91    }
92
93    fn signature(&self) -> &Signature {
94        &self.signature
95    }
96
97    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
98        let element_type = if arg_types.is_empty() {
99            Null
100        } else {
101            // At this point, all the type in array should be coerced to the same one.
102            arg_types[0].to_owned()
103        };
104
105        Ok(DataType::new_list(element_type, true))
106    }
107
108    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
109        make_scalar_function(make_array_inner)(&args.args)
110    }
111
112    fn aliases(&self) -> &[String] {
113        &self.aliases
114    }
115
116    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
117        if arg_types.is_empty() {
118            Ok(vec![])
119        } else {
120            coerce_types_inner(arg_types, self.name())
121        }
122    }
123
124    fn documentation(&self) -> Option<&Documentation> {
125        self.doc()
126    }
127}
128
129/// `make_array_inner` is the implementation of the `make_array` function.
130/// Constructs an array using the input `data` as `ArrayRef`.
131/// Returns a reference-counted `Array` instance result.
132pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
133    let data_type = arrays.iter().find_map(|arg| {
134        let arg_type = arg.data_type();
135        (!arg_type.is_null()).then_some(arg_type)
136    });
137
138    let data_type = data_type.unwrap_or(&Null);
139    if data_type.is_null() {
140        // Either an empty array or all nulls:
141        let length = arrays.iter().map(|a| a.len()).sum();
142        let array = new_null_array(&Null, length);
143        Ok(Arc::new(
144            SingleRowListArrayBuilder::new(array).build_list_array(),
145        ))
146    } else {
147        array_array::<i32>(arrays, data_type.clone(), Field::LIST_FIELD_DEFAULT_NAME)
148    }
149}
150
151/// Convert one or more [`ArrayRef`] of the same type into a
152/// `ListArray` or 'LargeListArray' depending on the offset size.
153///
154/// # Example (non nested)
155///
156/// Calling `array(col1, col2)` where col1 and col2 are non nested
157/// would return a single new `ListArray`, where each row was a list
158/// of 2 elements:
159///
160/// ```text
161/// ┌─────────┐   ┌─────────┐           ┌──────────────┐
162/// │ ┌─────┐ │   │ ┌─────┐ │           │ ┌──────────┐ │
163/// │ │  A  │ │   │ │  X  │ │           │ │  [A, X]  │ │
164/// │ ├─────┤ │   │ ├─────┤ │           │ ├──────────┤ │
165/// │ │NULL │ │   │ │  Y  │ │──────────▶│ │[NULL, Y] │ │
166/// │ ├─────┤ │   │ ├─────┤ │           │ ├──────────┤ │
167/// │ │  C  │ │   │ │  Z  │ │           │ │  [C, Z]  │ │
168/// │ └─────┘ │   │ └─────┘ │           │ └──────────┘ │
169/// └─────────┘   └─────────┘           └──────────────┘
170///   col1           col2                    output
171/// ```
172///
173/// # Example (nested)
174///
175/// Calling `array(col1, col2)` where col1 and col2 are lists
176/// would return a single new `ListArray`, where each row was a list
177/// of the corresponding elements of col1 and col2.
178///
179/// ``` text
180/// ┌──────────────┐   ┌──────────────┐        ┌─────────────────────────────┐
181/// │ ┌──────────┐ │   │ ┌──────────┐ │        │ ┌────────────────────────┐  │
182/// │ │  [A, X]  │ │   │ │    []    │ │        │ │    [[A, X], []]        │  │
183/// │ ├──────────┤ │   │ ├──────────┤ │        │ ├────────────────────────┤  │
184/// │ │[NULL, Y] │ │   │ │[Q, R, S] │ │───────▶│ │ [[NULL, Y], [Q, R, S]] │  │
185/// │ ├──────────┤ │   │ ├──────────┤ │        │ ├────────────────────────│  │
186/// │ │  [C, Z]  │ │   │ │   NULL   │ │        │ │    [[C, Z], NULL]      │  │
187/// │ └──────────┘ │   │ └──────────┘ │        │ └────────────────────────┘  │
188/// └──────────────┘   └──────────────┘        └─────────────────────────────┘
189///      col1               col2                         output
190/// ```
191pub fn array_array<O: OffsetSizeTrait>(
192    args: &[ArrayRef],
193    data_type: DataType,
194    field_name: &str,
195) -> Result<ArrayRef> {
196    // do not accept 0 arguments.
197    if args.is_empty() {
198        return plan_err!("Array requires at least one argument");
199    }
200
201    let mut data = vec![];
202    let mut total_len = 0;
203    for arg in args {
204        let arg_data = if arg.as_any().is::<NullArray>() {
205            ArrayData::new_empty(&data_type)
206        } else {
207            arg.to_data()
208        };
209        total_len += arg_data.len();
210        data.push(arg_data);
211    }
212
213    let mut offsets: Vec<O> = Vec::with_capacity(total_len);
214    offsets.push(O::usize_as(0));
215
216    let capacity = Capacities::Array(total_len);
217    let data_ref = data.iter().collect::<Vec<_>>();
218    let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);
219
220    let num_rows = args[0].len();
221    for row_idx in 0..num_rows {
222        for (arr_idx, arg) in args.iter().enumerate() {
223            if !arg.as_any().is::<NullArray>()
224                && !arg.is_null(row_idx)
225                && arg.is_valid(row_idx)
226            {
227                mutable.extend(arr_idx, row_idx, row_idx + 1);
228            } else {
229                mutable.extend_nulls(1);
230            }
231        }
232        offsets.push(O::usize_as(mutable.len()));
233    }
234    let data = mutable.freeze();
235
236    Ok(Arc::new(GenericListArray::<O>::try_new(
237        Arc::new(Field::new(field_name, data_type, true)),
238        OffsetBuffer::new(offsets.into()),
239        arrow::array::make_array(data),
240        None,
241    )?))
242}
243
244pub fn coerce_types_inner(arg_types: &[DataType], name: &str) -> Result<Vec<DataType>> {
245    if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
246        return Ok(unified);
247    }
248
249    if let Some(unified) = type_union_resolution(arg_types) {
250        Ok(vec![unified; arg_types.len()])
251    } else {
252        plan_err!(
253            "Failed to unify argument types of {}: [{}]",
254            name,
255            arg_types.iter().join(", ")
256        )
257    }
258}