datafusion_functions_nested/
make_array.rs1use std::any::Any;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_scalar_function;
25use arrow::array::{
26 Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
27 NullArray, OffsetSizeTrait, new_null_array,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::DataType;
31use arrow::datatypes::{DataType::Null, Field};
32use datafusion_common::utils::SingleRowListArrayBuilder;
33use datafusion_common::{Result, plan_err};
34use datafusion_expr::TypeSignature;
35use datafusion_expr::binary::{
36 try_type_union_resolution_with_struct, type_union_resolution,
37};
38use datafusion_expr::{
39 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
40};
41use datafusion_macros::user_doc;
42use itertools::Itertools as _;
43
44make_udf_expr_and_func!(
45 MakeArray,
46 make_array,
47 "Returns an Arrow array using the specified input expressions.",
48 make_array_udf
49);
50
51#[user_doc(
52 doc_section(label = "Array Functions"),
53 description = "Returns an array using the specified input expressions.",
54 syntax_example = "make_array(expression1[, ..., expression_n])",
55 sql_example = r#"```sql
56> select make_array(1, 2, 3, 4, 5);
57+----------------------------------------------------------+
58| make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)) |
59+----------------------------------------------------------+
60| [1, 2, 3, 4, 5] |
61+----------------------------------------------------------+
62```"#,
63 argument(
64 name = "expression_n",
65 description = "Expression to include in the output array. Can be a constant, column, or function, and any combination of arithmetic or string operators."
66 )
67)]
68#[derive(Debug, PartialEq, Eq, Hash)]
69pub struct MakeArray {
70 signature: Signature,
71 aliases: Vec<String>,
72}
73
74impl Default for MakeArray {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80impl MakeArray {
81 pub fn new() -> Self {
82 Self {
83 signature: Signature::one_of(
84 vec![TypeSignature::Nullary, TypeSignature::UserDefined],
85 Volatility::Immutable,
86 ),
87 aliases: vec![String::from("make_list")],
88 }
89 }
90}
91
92impl ScalarUDFImpl for MakeArray {
93 fn as_any(&self) -> &dyn Any {
94 self
95 }
96
97 fn name(&self) -> &str {
98 "make_array"
99 }
100
101 fn signature(&self) -> &Signature {
102 &self.signature
103 }
104
105 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
106 let element_type = if arg_types.is_empty() {
107 Null
108 } else {
109 arg_types[0].to_owned()
111 };
112
113 Ok(DataType::new_list(element_type, true))
114 }
115
116 fn invoke_with_args(
117 &self,
118 args: datafusion_expr::ScalarFunctionArgs,
119 ) -> Result<ColumnarValue> {
120 make_scalar_function(make_array_inner)(&args.args)
121 }
122
123 fn aliases(&self) -> &[String] {
124 &self.aliases
125 }
126
127 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
128 coerce_types_inner(arg_types, self.name())
129 }
130
131 fn documentation(&self) -> Option<&Documentation> {
132 self.doc()
133 }
134}
135
136pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
140 let data_type = arrays.iter().find_map(|arg| {
141 let arg_type = arg.data_type();
142 (!arg_type.is_null()).then_some(arg_type)
143 });
144
145 let data_type = data_type.unwrap_or(&Null);
146 if data_type.is_null() {
147 let length = arrays.iter().map(|a| a.len()).sum();
149 let array = new_null_array(&Null, length);
150 Ok(Arc::new(
151 SingleRowListArrayBuilder::new(array).build_list_array(),
152 ))
153 } else {
154 array_array::<i32>(arrays, data_type.clone(), Field::LIST_FIELD_DEFAULT_NAME)
155 }
156}
157
158pub fn array_array<O: OffsetSizeTrait>(
199 args: &[ArrayRef],
200 data_type: DataType,
201 field_name: &str,
202) -> Result<ArrayRef> {
203 if args.is_empty() {
205 return plan_err!("Array requires at least one argument");
206 }
207
208 let mut data = vec![];
209 let mut total_len = 0;
210 for arg in args {
211 let arg_data = if arg.as_any().is::<NullArray>() {
212 ArrayData::new_empty(&data_type)
213 } else {
214 arg.to_data()
215 };
216 total_len += arg_data.len();
217 data.push(arg_data);
218 }
219
220 let mut offsets: Vec<O> = Vec::with_capacity(total_len);
221 offsets.push(O::usize_as(0));
222
223 let capacity = Capacities::Array(total_len);
224 let data_ref = data.iter().collect::<Vec<_>>();
225 let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);
226
227 let num_rows = args[0].len();
228 for row_idx in 0..num_rows {
229 for (arr_idx, arg) in args.iter().enumerate() {
230 if !arg.as_any().is::<NullArray>()
231 && !arg.is_null(row_idx)
232 && arg.is_valid(row_idx)
233 {
234 mutable.extend(arr_idx, row_idx, row_idx + 1);
235 } else {
236 mutable.extend_nulls(1);
237 }
238 }
239 offsets.push(O::usize_as(mutable.len()));
240 }
241 let data = mutable.freeze();
242
243 Ok(Arc::new(GenericListArray::<O>::try_new(
244 Arc::new(Field::new(field_name, data_type, true)),
245 OffsetBuffer::new(offsets.into()),
246 arrow::array::make_array(data),
247 None,
248 )?))
249}
250
251pub fn coerce_types_inner(arg_types: &[DataType], name: &str) -> Result<Vec<DataType>> {
252 if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
253 return Ok(unified);
254 }
255
256 if let Some(unified) = type_union_resolution(arg_types) {
257 Ok(vec![unified; arg_types.len()])
258 } else {
259 plan_err!(
260 "Failed to unify argument types of {}: [{}]",
261 name,
262 arg_types.iter().join(", ")
263 )
264 }
265}