datafusion_functions_nested/
make_array.rs1use std::any::Any;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_scalar_function;
25use arrow::array::{
26 Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
27 NullArray, OffsetSizeTrait, new_null_array,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::DataType;
31use arrow::datatypes::{DataType::Null, Field};
32use datafusion_common::utils::SingleRowListArrayBuilder;
33use datafusion_common::{Result, plan_err};
34use datafusion_expr::binary::{
35 try_type_union_resolution_with_struct, type_union_resolution,
36};
37use datafusion_expr::{
38 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
39};
40use datafusion_macros::user_doc;
41use itertools::Itertools as _;
42
43make_udf_expr_and_func!(
44 MakeArray,
45 make_array,
46 "Returns an Arrow array using the specified input expressions.",
47 make_array_udf
48);
49
50#[user_doc(
51 doc_section(label = "Array Functions"),
52 description = "Returns an array using the specified input expressions.",
53 syntax_example = "make_array(expression1[, ..., expression_n])",
54 sql_example = r#"```sql
55> select make_array(1, 2, 3, 4, 5);
56+----------------------------------------------------------+
57| make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)) |
58+----------------------------------------------------------+
59| [1, 2, 3, 4, 5] |
60+----------------------------------------------------------+
61```"#,
62 argument(
63 name = "expression_n",
64 description = "Expression to include in the output array. Can be a constant, column, or function, and any combination of arithmetic or string operators."
65 )
66)]
67#[derive(Debug, PartialEq, Eq, Hash)]
68pub struct MakeArray {
69 signature: Signature,
70 aliases: Vec<String>,
71}
72
73impl Default for MakeArray {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl MakeArray {
80 pub fn new() -> Self {
81 Self {
82 signature: Signature::user_defined(Volatility::Immutable),
83 aliases: vec![String::from("make_list")],
84 }
85 }
86}
87
88impl ScalarUDFImpl for MakeArray {
89 fn as_any(&self) -> &dyn Any {
90 self
91 }
92
93 fn name(&self) -> &str {
94 "make_array"
95 }
96
97 fn signature(&self) -> &Signature {
98 &self.signature
99 }
100
101 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
102 let element_type = if arg_types.is_empty() {
103 Null
104 } else {
105 arg_types[0].to_owned()
107 };
108
109 Ok(DataType::new_list(element_type, true))
110 }
111
112 fn invoke_with_args(
113 &self,
114 args: datafusion_expr::ScalarFunctionArgs,
115 ) -> Result<ColumnarValue> {
116 make_scalar_function(make_array_inner)(&args.args)
117 }
118
119 fn aliases(&self) -> &[String] {
120 &self.aliases
121 }
122
123 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
124 if arg_types.is_empty() {
125 Ok(vec![])
126 } else {
127 coerce_types_inner(arg_types, self.name())
128 }
129 }
130
131 fn documentation(&self) -> Option<&Documentation> {
132 self.doc()
133 }
134}
135
136pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
140 let data_type = arrays.iter().find_map(|arg| {
141 let arg_type = arg.data_type();
142 (!arg_type.is_null()).then_some(arg_type)
143 });
144
145 let data_type = data_type.unwrap_or(&Null);
146 if data_type.is_null() {
147 let length = arrays.iter().map(|a| a.len()).sum();
149 let array = new_null_array(&Null, length);
150 Ok(Arc::new(
151 SingleRowListArrayBuilder::new(array).build_list_array(),
152 ))
153 } else {
154 array_array::<i32>(arrays, data_type.clone(), Field::LIST_FIELD_DEFAULT_NAME)
155 }
156}
157
158pub fn array_array<O: OffsetSizeTrait>(
199 args: &[ArrayRef],
200 data_type: DataType,
201 field_name: &str,
202) -> Result<ArrayRef> {
203 if args.is_empty() {
205 return plan_err!("Array requires at least one argument");
206 }
207
208 let mut data = vec![];
209 let mut total_len = 0;
210 for arg in args {
211 let arg_data = if arg.as_any().is::<NullArray>() {
212 ArrayData::new_empty(&data_type)
213 } else {
214 arg.to_data()
215 };
216 total_len += arg_data.len();
217 data.push(arg_data);
218 }
219
220 let mut offsets: Vec<O> = Vec::with_capacity(total_len);
221 offsets.push(O::usize_as(0));
222
223 let capacity = Capacities::Array(total_len);
224 let data_ref = data.iter().collect::<Vec<_>>();
225 let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);
226
227 let num_rows = args[0].len();
228 for row_idx in 0..num_rows {
229 for (arr_idx, arg) in args.iter().enumerate() {
230 if !arg.as_any().is::<NullArray>()
231 && !arg.is_null(row_idx)
232 && arg.is_valid(row_idx)
233 {
234 mutable.extend(arr_idx, row_idx, row_idx + 1);
235 } else {
236 mutable.extend_nulls(1);
237 }
238 }
239 offsets.push(O::usize_as(mutable.len()));
240 }
241 let data = mutable.freeze();
242
243 Ok(Arc::new(GenericListArray::<O>::try_new(
244 Arc::new(Field::new(field_name, data_type, true)),
245 OffsetBuffer::new(offsets.into()),
246 arrow::array::make_array(data),
247 None,
248 )?))
249}
250
251pub fn coerce_types_inner(arg_types: &[DataType], name: &str) -> Result<Vec<DataType>> {
252 if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
253 return Ok(unified);
254 }
255
256 if let Some(unified) = type_union_resolution(arg_types) {
257 Ok(vec![unified; arg_types.len()])
258 } else {
259 plan_err!(
260 "Failed to unify argument types of {}: [{}]",
261 name,
262 arg_types.iter().join(", ")
263 )
264 }
265}