datafusion_functions_nested/
arrays_zip.rs1use crate::utils::make_scalar_function;
21use arrow::array::{
22 Array, ArrayRef, Capacities, ListArray, MutableArrayData, NullBufferBuilder,
23 StructArray, new_null_array,
24};
25use arrow::buffer::OffsetBuffer;
26use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null};
27use arrow::datatypes::{DataType, Field, Fields};
28use datafusion_common::cast::{
29 as_fixed_size_list_array, as_large_list_array, as_list_array,
30};
31use datafusion_common::{Result, exec_err};
32use datafusion_expr::{
33 ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
34 Volatility,
35};
36use datafusion_macros::user_doc;
37use std::sync::Arc;
38
39struct ListColumnView {
42 values: ArrayRef,
44 offsets: Vec<usize>,
46 nulls: Option<arrow::buffer::NullBuffer>,
48}
49
50impl ListColumnView {
51 fn is_null(&self, idx: usize) -> bool {
52 self.nulls.as_ref().is_some_and(|n| n.is_null(idx))
53 }
54}
55
56make_udf_expr_and_func!(
57 ArraysZip,
58 arrays_zip,
59 "combines one or multiple arrays into a single array of structs.",
60 arrays_zip_udf
61);
62
63#[user_doc(
64 doc_section(label = "Array Functions"),
65 description = "Returns an array of structs created by combining the elements of each input array at the same index. If the arrays have different lengths, shorter arrays are padded with NULLs.",
66 syntax_example = "arrays_zip(array1[, ..., array_n])",
67 sql_example = r#"```sql
68> select arrays_zip([1, 2, 3]);
69+---------------------------------------------------+
70| arrays_zip([1, 2, 3]) |
71+---------------------------------------------------+
72| [{1: 1}, {1: 2}, {1: 3}] |
73+---------------------------------------------------+
74> select arrays_zip([1, 2], [3, 4, 5]);
75+---------------------------------------------------+
76| arrays_zip([1, 2], [3, 4, 5]) |
77+---------------------------------------------------+
78| [{1: 1, 2: 3}, {1: 2, 2: 4}, {1: NULL, 2: 5}] |
79+---------------------------------------------------+
80```"#,
81 argument(name = "array1", description = "First array expression."),
82 argument(
83 name = "array_n",
84 description = "Optional additional array expressions."
85 )
86)]
87#[derive(Debug, PartialEq, Eq, Hash)]
88pub struct ArraysZip {
89 signature: Signature,
90 aliases: Vec<String>,
91}
92
93impl Default for ArraysZip {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99impl ArraysZip {
100 pub fn new() -> Self {
101 Self {
102 signature: Signature::variadic_any(Volatility::Immutable),
103 aliases: vec![String::from("list_zip")],
104 }
105 }
106}
107
108impl ScalarUDFImpl for ArraysZip {
109 fn name(&self) -> &str {
110 "arrays_zip"
111 }
112
113 fn signature(&self) -> &Signature {
114 &self.signature
115 }
116
117 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
118 if arg_types.is_empty() {
119 return exec_err!("arrays_zip requires at least one argument");
120 }
121
122 let mut fields = Vec::with_capacity(arg_types.len());
123 for (i, arg_type) in arg_types.iter().enumerate() {
124 let element_type = match arg_type {
125 List(field) | LargeList(field) | FixedSizeList(field, _) => {
126 field.data_type().clone()
127 }
128 Null => Null,
129 dt => {
130 return exec_err!("arrays_zip expects array arguments, got {dt}");
131 }
132 };
133 fields.push(Field::new(format!("{}", i + 1), element_type, true));
134 }
135
136 Ok(List(Arc::new(Field::new_list_field(
137 DataType::Struct(Fields::from(fields)),
138 true,
139 ))))
140 }
141
142 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
143 make_scalar_function(arrays_zip_inner)(&args.args)
144 }
145
146 fn aliases(&self) -> &[String] {
147 &self.aliases
148 }
149
150 fn documentation(&self) -> Option<&Documentation> {
151 self.doc()
152 }
153}
154
155fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
162 if args.is_empty() {
163 return exec_err!("arrays_zip requires at least one argument");
164 }
165
166 let num_rows = args[0].len();
167
168 let mut views: Vec<Option<ListColumnView>> = Vec::with_capacity(args.len());
171 let mut element_types: Vec<DataType> = Vec::with_capacity(args.len());
172
173 for (i, arg) in args.iter().enumerate() {
174 match arg.data_type() {
175 List(field) => {
176 let arr = as_list_array(arg)?;
177 let raw_offsets = arr.value_offsets();
178 let offsets: Vec<usize> =
179 raw_offsets.iter().map(|&o| o as usize).collect();
180 element_types.push(field.data_type().clone());
181 views.push(Some(ListColumnView {
182 values: Arc::clone(arr.values()),
183 offsets,
184 nulls: arr.nulls().cloned(),
185 }));
186 }
187 LargeList(field) => {
188 let arr = as_large_list_array(arg)?;
189 let raw_offsets = arr.value_offsets();
190 let offsets: Vec<usize> =
191 raw_offsets.iter().map(|&o| o as usize).collect();
192 element_types.push(field.data_type().clone());
193 views.push(Some(ListColumnView {
194 values: Arc::clone(arr.values()),
195 offsets,
196 nulls: arr.nulls().cloned(),
197 }));
198 }
199 FixedSizeList(field, size) => {
200 let arr = as_fixed_size_list_array(arg)?;
201 let size = *size as usize;
202 let offsets: Vec<usize> = (0..=num_rows).map(|row| row * size).collect();
203 element_types.push(field.data_type().clone());
204 views.push(Some(ListColumnView {
205 values: Arc::clone(arr.values()),
206 offsets,
207 nulls: arr.nulls().cloned(),
208 }));
209 }
210 Null => {
211 element_types.push(Null);
212 views.push(None);
213 }
214 dt => {
215 return exec_err!("arrays_zip argument {i} expected list type, got {dt}");
216 }
217 }
218 }
219
220 let values_data: Vec<_> = views
222 .iter()
223 .map(|v| v.as_ref().map(|view| view.values.to_data()))
224 .collect();
225
226 let struct_fields: Fields = element_types
227 .iter()
228 .enumerate()
229 .map(|(i, dt)| Field::new(format!("{}", i + 1), dt.clone(), true))
230 .collect::<Vec<_>>()
231 .into();
232
233 let mut builders: Vec<Option<MutableArrayData>> = values_data
236 .iter()
237 .map(|vd| {
238 vd.as_ref().map(|data| {
239 MutableArrayData::with_capacities(vec![data], true, Capacities::Array(0))
240 })
241 })
242 .collect();
243
244 let mut offsets: Vec<i32> = Vec::with_capacity(num_rows + 1);
245 offsets.push(0);
246 let mut null_builder = NullBufferBuilder::new(num_rows);
247 let mut total_values: usize = 0;
248
249 for row_idx in 0..num_rows {
252 let mut max_len: usize = 0;
253 let mut all_null = true;
254
255 for view in views.iter().flatten() {
256 if !view.is_null(row_idx) {
257 all_null = false;
258 let len = view.offsets[row_idx + 1] - view.offsets[row_idx];
259 max_len = max_len.max(len);
260 }
261 }
262
263 if all_null {
264 null_builder.append_null();
265 offsets.push(*offsets.last().unwrap());
266 continue;
267 }
268 null_builder.append_non_null();
269
270 for (col_idx, view) in views.iter().enumerate() {
272 match view {
273 Some(v) if !v.is_null(row_idx) => {
274 let start = v.offsets[row_idx];
275 let end = v.offsets[row_idx + 1];
276 let len = end - start;
277 let builder = builders[col_idx].as_mut().unwrap();
278 builder.extend(0, start, end);
279 if len < max_len {
280 builder.extend_nulls(max_len - len);
281 }
282 }
283 _ => {
284 if let Some(builder) = builders[col_idx].as_mut() {
286 builder.extend_nulls(max_len);
287 }
288 }
289 }
290 }
291
292 total_values += max_len;
293 let last = *offsets.last().unwrap();
294 offsets.push(last + max_len as i32);
295 }
296
297 let struct_columns: Vec<ArrayRef> = builders
299 .into_iter()
300 .zip(element_types.iter())
301 .map(|(builder, elem_type)| match builder {
302 Some(b) => arrow::array::make_array(b.freeze()),
303 None => new_null_array(
304 if elem_type.is_null() {
305 &Null
306 } else {
307 elem_type
308 },
309 total_values,
310 ),
311 })
312 .collect();
313
314 let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?;
315
316 let null_buffer = null_builder.finish();
317
318 let result = ListArray::try_new(
319 Arc::new(Field::new_list_field(
320 struct_array.data_type().clone(),
321 true,
322 )),
323 OffsetBuffer::new(offsets.into()),
324 Arc::new(struct_array),
325 null_buffer,
326 )?;
327
328 Ok(Arc::new(result))
329}