datafusion_functions_nested/
make_array.rs1use std::any::Any;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_scalar_function;
25use arrow::array::{
26 new_null_array, Array, ArrayData, ArrayRef, Capacities, GenericListArray,
27 MutableArrayData, NullArray, OffsetSizeTrait,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::DataType;
31use arrow::datatypes::{
32 DataType::{List, Null},
33 Field,
34};
35use datafusion_common::utils::SingleRowListArrayBuilder;
36use datafusion_common::{plan_err, Result};
37use datafusion_expr::binary::{
38 try_type_union_resolution_with_struct, type_union_resolution,
39};
40use datafusion_expr::TypeSignature;
41use datafusion_expr::{
42 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
43};
44use datafusion_macros::user_doc;
45
46make_udf_expr_and_func!(
47 MakeArray,
48 make_array,
49 "Returns an Arrow array using the specified input expressions.",
50 make_array_udf
51);
52
53#[user_doc(
54 doc_section(label = "Array Functions"),
55 description = "Returns an array using the specified input expressions.",
56 syntax_example = "make_array(expression1[, ..., expression_n])",
57 sql_example = r#"```sql
58> select make_array(1, 2, 3, 4, 5);
59+----------------------------------------------------------+
60| make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)) |
61+----------------------------------------------------------+
62| [1, 2, 3, 4, 5] |
63+----------------------------------------------------------+
64```"#,
65 argument(
66 name = "expression_n",
67 description = "Expression to include in the output array. Can be a constant, column, or function, and any combination of arithmetic or string operators."
68 )
69)]
70#[derive(Debug)]
71pub struct MakeArray {
72 signature: Signature,
73 aliases: Vec<String>,
74}
75
76impl Default for MakeArray {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82impl MakeArray {
83 pub fn new() -> Self {
84 Self {
85 signature: Signature::one_of(
86 vec![TypeSignature::Nullary, TypeSignature::UserDefined],
87 Volatility::Immutable,
88 ),
89 aliases: vec![String::from("make_list")],
90 }
91 }
92}
93
94impl ScalarUDFImpl for MakeArray {
95 fn as_any(&self) -> &dyn Any {
96 self
97 }
98
99 fn name(&self) -> &str {
100 "make_array"
101 }
102
103 fn signature(&self) -> &Signature {
104 &self.signature
105 }
106
107 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
108 match arg_types.len() {
109 0 => Ok(empty_array_type()),
110 _ => {
111 Ok(List(Arc::new(Field::new_list_field(
113 arg_types[0].to_owned(),
114 true,
115 ))))
116 }
117 }
118 }
119
120 fn invoke_with_args(
121 &self,
122 args: datafusion_expr::ScalarFunctionArgs,
123 ) -> Result<ColumnarValue> {
124 make_scalar_function(make_array_inner)(&args.args)
125 }
126
127 fn aliases(&self) -> &[String] {
128 &self.aliases
129 }
130
131 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
132 let mut errors = vec![];
133 match try_type_union_resolution_with_struct(arg_types) {
134 Ok(r) => return Ok(r),
135 Err(e) => {
136 errors.push(e);
137 }
138 }
139
140 if let Some(new_type) = type_union_resolution(arg_types) {
141 if new_type.is_null() {
142 Ok(vec![DataType::Int64; arg_types.len()])
143 } else {
144 Ok(vec![new_type; arg_types.len()])
145 }
146 } else {
147 plan_err!(
148 "Fail to find the valid type between {:?} for {}, errors are {:?}",
149 arg_types,
150 self.name(),
151 errors
152 )
153 }
154 }
155
156 fn documentation(&self) -> Option<&Documentation> {
157 self.doc()
158 }
159}
160
161pub(super) fn empty_array_type() -> DataType {
163 List(Arc::new(Field::new_list_field(DataType::Int64, true)))
164}
165
166pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
170 let mut data_type = Null;
171 for arg in arrays {
172 let arg_data_type = arg.data_type();
173 if !arg_data_type.equals_datatype(&Null) {
174 data_type = arg_data_type.clone();
175 break;
176 }
177 }
178
179 match data_type {
180 Null => {
182 let length = arrays.iter().map(|a| a.len()).sum();
183 let array = new_null_array(&DataType::Int64, length);
185 Ok(Arc::new(
186 SingleRowListArrayBuilder::new(array).build_list_array(),
187 ))
188 }
189 _ => array_array::<i32>(arrays, data_type),
190 }
191}
192
193fn array_array<O: OffsetSizeTrait>(
234 args: &[ArrayRef],
235 data_type: DataType,
236) -> Result<ArrayRef> {
237 if args.is_empty() {
239 return plan_err!("Array requires at least one argument");
240 }
241
242 let mut data = vec![];
243 let mut total_len = 0;
244 for arg in args {
245 let arg_data = if arg.as_any().is::<NullArray>() {
246 ArrayData::new_empty(&data_type)
247 } else {
248 arg.to_data()
249 };
250 total_len += arg_data.len();
251 data.push(arg_data);
252 }
253
254 let mut offsets: Vec<O> = Vec::with_capacity(total_len);
255 offsets.push(O::usize_as(0));
256
257 let capacity = Capacities::Array(total_len);
258 let data_ref = data.iter().collect::<Vec<_>>();
259 let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);
260
261 let num_rows = args[0].len();
262 for row_idx in 0..num_rows {
263 for (arr_idx, arg) in args.iter().enumerate() {
264 if !arg.as_any().is::<NullArray>()
265 && !arg.is_null(row_idx)
266 && arg.is_valid(row_idx)
267 {
268 mutable.extend(arr_idx, row_idx, row_idx + 1);
269 } else {
270 mutable.extend_nulls(1);
271 }
272 }
273 offsets.push(O::usize_as(mutable.len()));
274 }
275 let data = mutable.freeze();
276
277 Ok(Arc::new(GenericListArray::<O>::try_new(
278 Arc::new(Field::new_list_field(data_type, true)),
279 OffsetBuffer::new(offsets.into()),
280 arrow::array::make_array(data),
281 None,
282 )?))
283}