datafusion_functions_nested/
make_array.rs1use std::any::Any;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_scalar_function;
25use arrow::array::{
26 new_null_array, Array, ArrayData, ArrayRef, Capacities, GenericListArray,
27 MutableArrayData, NullArray, OffsetSizeTrait,
28};
29use arrow::buffer::OffsetBuffer;
30use arrow::datatypes::DataType;
31use arrow::datatypes::{DataType::Null, Field};
32use datafusion_common::utils::SingleRowListArrayBuilder;
33use datafusion_common::{plan_err, Result};
34use datafusion_expr::binary::{
35 try_type_union_resolution_with_struct, type_union_resolution,
36};
37use datafusion_expr::TypeSignature;
38use datafusion_expr::{
39 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
40};
41use datafusion_macros::user_doc;
42
43make_udf_expr_and_func!(
44 MakeArray,
45 make_array,
46 "Returns an Arrow array using the specified input expressions.",
47 make_array_udf
48);
49
50#[user_doc(
51 doc_section(label = "Array Functions"),
52 description = "Returns an array using the specified input expressions.",
53 syntax_example = "make_array(expression1[, ..., expression_n])",
54 sql_example = r#"```sql
55> select make_array(1, 2, 3, 4, 5);
56+----------------------------------------------------------+
57| make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)) |
58+----------------------------------------------------------+
59| [1, 2, 3, 4, 5] |
60+----------------------------------------------------------+
61```"#,
62 argument(
63 name = "expression_n",
64 description = "Expression to include in the output array. Can be a constant, column, or function, and any combination of arithmetic or string operators."
65 )
66)]
67#[derive(Debug)]
68pub struct MakeArray {
69 signature: Signature,
70 aliases: Vec<String>,
71}
72
73impl Default for MakeArray {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl MakeArray {
80 pub fn new() -> Self {
81 Self {
82 signature: Signature::one_of(
83 vec![TypeSignature::Nullary, TypeSignature::UserDefined],
84 Volatility::Immutable,
85 ),
86 aliases: vec![String::from("make_list")],
87 }
88 }
89}
90
91impl ScalarUDFImpl for MakeArray {
92 fn as_any(&self) -> &dyn Any {
93 self
94 }
95
96 fn name(&self) -> &str {
97 "make_array"
98 }
99
100 fn signature(&self) -> &Signature {
101 &self.signature
102 }
103
104 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
105 let element_type = if arg_types.is_empty() {
106 Null
107 } else {
108 arg_types[0].to_owned()
110 };
111
112 Ok(DataType::new_list(element_type, true))
113 }
114
115 fn invoke_with_args(
116 &self,
117 args: datafusion_expr::ScalarFunctionArgs,
118 ) -> Result<ColumnarValue> {
119 make_scalar_function(make_array_inner)(&args.args)
120 }
121
122 fn aliases(&self) -> &[String] {
123 &self.aliases
124 }
125
126 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
127 if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) {
128 return Ok(unified);
129 }
130
131 if let Some(unified) = type_union_resolution(arg_types) {
132 Ok(vec![unified; arg_types.len()])
133 } else {
134 plan_err!(
135 "Failed to unify argument types of {}: {arg_types:?}",
136 self.name()
137 )
138 }
139 }
140
141 fn documentation(&self) -> Option<&Documentation> {
142 self.doc()
143 }
144}
145
146pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
150 let data_type = arrays.iter().find_map(|arg| {
151 let arg_type = arg.data_type();
152 (!arg_type.is_null()).then_some(arg_type)
153 });
154
155 let data_type = data_type.unwrap_or(&Null);
156 if data_type.is_null() {
157 let length = arrays.iter().map(|a| a.len()).sum();
159 let array = new_null_array(&Null, length);
160 Ok(Arc::new(
161 SingleRowListArrayBuilder::new(array).build_list_array(),
162 ))
163 } else {
164 array_array::<i32>(arrays, data_type.clone())
165 }
166}
167
168fn array_array<O: OffsetSizeTrait>(
209 args: &[ArrayRef],
210 data_type: DataType,
211) -> Result<ArrayRef> {
212 if args.is_empty() {
214 return plan_err!("Array requires at least one argument");
215 }
216
217 let mut data = vec![];
218 let mut total_len = 0;
219 for arg in args {
220 let arg_data = if arg.as_any().is::<NullArray>() {
221 ArrayData::new_empty(&data_type)
222 } else {
223 arg.to_data()
224 };
225 total_len += arg_data.len();
226 data.push(arg_data);
227 }
228
229 let mut offsets: Vec<O> = Vec::with_capacity(total_len);
230 offsets.push(O::usize_as(0));
231
232 let capacity = Capacities::Array(total_len);
233 let data_ref = data.iter().collect::<Vec<_>>();
234 let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);
235
236 let num_rows = args[0].len();
237 for row_idx in 0..num_rows {
238 for (arr_idx, arg) in args.iter().enumerate() {
239 if !arg.as_any().is::<NullArray>()
240 && !arg.is_null(row_idx)
241 && arg.is_valid(row_idx)
242 {
243 mutable.extend(arr_idx, row_idx, row_idx + 1);
244 } else {
245 mutable.extend_nulls(1);
246 }
247 }
248 offsets.push(O::usize_as(mutable.len()));
249 }
250 let data = mutable.freeze();
251
252 Ok(Arc::new(GenericListArray::<O>::try_new(
253 Arc::new(Field::new_list_field(data_type, true)),
254 OffsetBuffer::new(offsets.into()),
255 arrow::array::make_array(data),
256 None,
257 )?))
258}