datafusion_functions_nested/
make_array.rs1use std::sync::Arc;
21use std::vec;
22
23use crate::utils::make_scalar_function;
24use arrow::array::{
25 Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
26 NullArray, OffsetSizeTrait, new_null_array,
27};
28use arrow::buffer::OffsetBuffer;
29use arrow::datatypes::DataType;
30use arrow::datatypes::{DataType::Null, Field};
31use datafusion_common::utils::SingleRowListArrayBuilder;
32use datafusion_common::{Result, plan_err};
33use datafusion_expr::binary::{
34 try_type_union_resolution_with_struct, type_union_resolution,
35};
36use datafusion_expr::{
37 ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
38 Volatility,
39};
40use datafusion_macros::user_doc;
41use itertools::Itertools as _;
42
43make_udf_expr_and_func!(
44 MakeArray,
45 make_array,
46 "Returns an Arrow array using the specified input expressions.",
47 make_array_udf
48);
49
50#[user_doc(
51 doc_section(label = "Array Functions"),
52 description = "Returns an array using the specified input expressions.",
53 syntax_example = "make_array(expression1[, ..., expression_n])",
54 sql_example = r#"```sql
55> select make_array(1, 2, 3, 4, 5);
56+----------------------------------------------------------+
57| make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)) |
58+----------------------------------------------------------+
59| [1, 2, 3, 4, 5] |
60+----------------------------------------------------------+
61```"#,
62 argument(
63 name = "expression_n",
64 description = "Expression to include in the output array. Can be a constant, column, or function, and any combination of arithmetic or string operators."
65 )
66)]
67#[derive(Debug, PartialEq, Eq, Hash)]
68pub struct MakeArray {
69 signature: Signature,
70 aliases: Vec<String>,
71}
72
73impl Default for MakeArray {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl MakeArray {
80 pub fn new() -> Self {
81 Self {
82 signature: Signature::user_defined(Volatility::Immutable),
83 aliases: vec![String::from("make_list")],
84 }
85 }
86}
87
88impl ScalarUDFImpl for MakeArray {
89 fn name(&self) -> &str {
90 "make_array"
91 }
92
93 fn signature(&self) -> &Signature {
94 &self.signature
95 }
96
97 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
98 let element_type = if arg_types.is_empty() {
99 Null
100 } else {
101 arg_types[0].to_owned()
103 };
104
105 Ok(DataType::new_list(element_type, true))
106 }
107
108 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
109 make_scalar_function(make_array_inner)(&args.args)
110 }
111
112 fn aliases(&self) -> &[String] {
113 &self.aliases
114 }
115
116 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
117 if arg_types.is_empty() {
118 Ok(vec![])
119 } else {
120 coerce_types_inner(arg_types, self.name())
121 }
122 }
123
124 fn documentation(&self) -> Option<&Documentation> {
125 self.doc()
126 }
127}
128
129pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
133 let data_type = arrays.iter().find_map(|arg| {
134 let arg_type = arg.data_type();
135 (!arg_type.is_null()).then_some(arg_type)
136 });
137
138 let data_type = data_type.unwrap_or(&Null);
139 if data_type.is_null() {
140 let length = arrays.iter().map(|a| a.len()).sum();
142 let array = new_null_array(&Null, length);
143 Ok(Arc::new(
144 SingleRowListArrayBuilder::new(array).build_list_array(),
145 ))
146 } else {
147 array_array::<i32>(arrays, data_type.clone(), Field::LIST_FIELD_DEFAULT_NAME)
148 }
149}
150
151pub fn array_array<O: OffsetSizeTrait>(
192 args: &[ArrayRef],
193 data_type: DataType,
194 field_name: &str,
195) -> Result<ArrayRef> {
196 if args.is_empty() {
198 return plan_err!("Array requires at least one argument");
199 }
200
201 let mut data = vec![];
202 let mut total_len = 0;
203 for arg in args {
204 let arg_data = if arg.as_any().is::<NullArray>() {
205 ArrayData::new_empty(&data_type)
206 } else {
207 arg.to_data()
208 };
209 total_len += arg_data.len();
210 data.push(arg_data);
211 }
212
213 let mut offsets: Vec<O> = Vec::with_capacity(total_len);
214 offsets.push(O::usize_as(0));
215
216 let capacity = Capacities::Array(total_len);
217 let data_ref = data.iter().collect::<Vec<_>>();
218 let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);
219
220 let num_rows = args[0].len();
221 for row_idx in 0..num_rows {
222 for (arr_idx, arg) in args.iter().enumerate() {
223 if !arg.as_any().is::<NullArray>()
224 && !arg.is_null(row_idx)
225 && arg.is_valid(row_idx)
226 {
227 mutable.extend(arr_idx, row_idx, row_idx + 1);
228 } else {
229 mutable.extend_nulls(1);
230 }
231 }
232 offsets.push(O::usize_as(mutable.len()));
233 }
234 let data = mutable.freeze();
235
236 Ok(Arc::new(GenericListArray::<O>::try_new(
237 Arc::new(Field::new(field_name, data_type, true)),
238 OffsetBuffer::new(offsets.into()),
239 arrow::array::make_array(data),
240 None,
241 )?))
242}
243
244pub fn coerce_types_inner(arg_types: &[DataType], name: &str) -> Result<Vec<DataType>> {
245 if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
246 return Ok(unified);
247 }
248
249 if let Some(unified) = type_union_resolution(arg_types) {
250 Ok(vec![unified; arg_types.len()])
251 } else {
252 plan_err!(
253 "Failed to unify argument types of {}: [{}]",
254 name,
255 arg_types.iter().join(", ")
256 )
257 }
258}