datafusion_functions_nested/
make_array.rs1use std::any::Any;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_scalar_function;
25use arrow::array::{
26 new_null_array, Array, ArrayData, ArrayRef, Capacities, GenericListArray,
27 MutableArrayData, NullArray, OffsetSizeTrait,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::DataType;
31use arrow::datatypes::{DataType::Null, Field};
32use datafusion_common::utils::SingleRowListArrayBuilder;
33use datafusion_common::{plan_err, Result};
34use datafusion_expr::binary::{
35 try_type_union_resolution_with_struct, type_union_resolution,
36};
37use datafusion_expr::TypeSignature;
38use datafusion_expr::{
39 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
40};
41use datafusion_macros::user_doc;
42use itertools::Itertools as _;
43
44make_udf_expr_and_func!(
45 MakeArray,
46 make_array,
47 "Returns an Arrow array using the specified input expressions.",
48 make_array_udf
49);
50
51#[user_doc(
52 doc_section(label = "Array Functions"),
53 description = "Returns an array using the specified input expressions.",
54 syntax_example = "make_array(expression1[, ..., expression_n])",
55 sql_example = r#"```sql
56> select make_array(1, 2, 3, 4, 5);
57+----------------------------------------------------------+
58| make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)) |
59+----------------------------------------------------------+
60| [1, 2, 3, 4, 5] |
61+----------------------------------------------------------+
62```"#,
63 argument(
64 name = "expression_n",
65 description = "Expression to include in the output array. Can be a constant, column, or function, and any combination of arithmetic or string operators."
66 )
67)]
68#[derive(Debug, PartialEq, Eq, Hash)]
69pub struct MakeArray {
70 signature: Signature,
71 aliases: Vec<String>,
72}
73
74impl Default for MakeArray {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80impl MakeArray {
81 pub fn new() -> Self {
82 Self {
83 signature: Signature::one_of(
84 vec![TypeSignature::Nullary, TypeSignature::UserDefined],
85 Volatility::Immutable,
86 ),
87 aliases: vec![String::from("make_list")],
88 }
89 }
90}
91
92impl ScalarUDFImpl for MakeArray {
93 fn as_any(&self) -> &dyn Any {
94 self
95 }
96
97 fn name(&self) -> &str {
98 "make_array"
99 }
100
101 fn signature(&self) -> &Signature {
102 &self.signature
103 }
104
105 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
106 let element_type = if arg_types.is_empty() {
107 Null
108 } else {
109 arg_types[0].to_owned()
111 };
112
113 Ok(DataType::new_list(element_type, true))
114 }
115
116 fn invoke_with_args(
117 &self,
118 args: datafusion_expr::ScalarFunctionArgs,
119 ) -> Result<ColumnarValue> {
120 make_scalar_function(make_array_inner)(&args.args)
121 }
122
123 fn aliases(&self) -> &[String] {
124 &self.aliases
125 }
126
127 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
128 if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
129 return Ok(unified);
130 }
131
132 if let Some(unified) = type_union_resolution(arg_types) {
133 Ok(vec![unified; arg_types.len()])
134 } else {
135 plan_err!(
136 "Failed to unify argument types of {}: [{}]",
137 self.name(),
138 arg_types.iter().join(", ")
139 )
140 }
141 }
142
143 fn documentation(&self) -> Option<&Documentation> {
144 self.doc()
145 }
146}
147
148pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
152 let data_type = arrays.iter().find_map(|arg| {
153 let arg_type = arg.data_type();
154 (!arg_type.is_null()).then_some(arg_type)
155 });
156
157 let data_type = data_type.unwrap_or(&Null);
158 if data_type.is_null() {
159 let length = arrays.iter().map(|a| a.len()).sum();
161 let array = new_null_array(&Null, length);
162 Ok(Arc::new(
163 SingleRowListArrayBuilder::new(array).build_list_array(),
164 ))
165 } else {
166 array_array::<i32>(arrays, data_type.clone())
167 }
168}
169
170fn array_array<O: OffsetSizeTrait>(
211 args: &[ArrayRef],
212 data_type: DataType,
213) -> Result<ArrayRef> {
214 if args.is_empty() {
216 return plan_err!("Array requires at least one argument");
217 }
218
219 let mut data = vec![];
220 let mut total_len = 0;
221 for arg in args {
222 let arg_data = if arg.as_any().is::<NullArray>() {
223 ArrayData::new_empty(&data_type)
224 } else {
225 arg.to_data()
226 };
227 total_len += arg_data.len();
228 data.push(arg_data);
229 }
230
231 let mut offsets: Vec<O> = Vec::with_capacity(total_len);
232 offsets.push(O::usize_as(0));
233
234 let capacity = Capacities::Array(total_len);
235 let data_ref = data.iter().collect::<Vec<_>>();
236 let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);
237
238 let num_rows = args[0].len();
239 for row_idx in 0..num_rows {
240 for (arr_idx, arg) in args.iter().enumerate() {
241 if !arg.as_any().is::<NullArray>()
242 && !arg.is_null(row_idx)
243 && arg.is_valid(row_idx)
244 {
245 mutable.extend(arr_idx, row_idx, row_idx + 1);
246 } else {
247 mutable.extend_nulls(1);
248 }
249 }
250 offsets.push(O::usize_as(mutable.len()));
251 }
252 let data = mutable.freeze();
253
254 Ok(Arc::new(GenericListArray::<O>::try_new(
255 Arc::new(Field::new_list_field(data_type, true)),
256 OffsetBuffer::new(offsets.into()),
257 arrow::array::make_array(data),
258 None,
259 )?))
260}