datafusion_functions_nested/
arrays_zip.rs1use crate::utils::make_scalar_function;
21use arrow::array::{
22 Array, ArrayRef, Capacities, ListArray, MutableArrayData, StructArray, new_null_array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null};
26use arrow::datatypes::{DataType, Field, Fields};
27use datafusion_common::cast::{
28 as_fixed_size_list_array, as_large_list_array, as_list_array,
29};
30use datafusion_common::{Result, exec_err};
31use datafusion_expr::{
32 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
33};
34use datafusion_macros::user_doc;
35use std::any::Any;
36use std::sync::Arc;
37
38struct ListColumnView {
41 values: ArrayRef,
43 offsets: Vec<usize>,
45 is_null: Vec<bool>,
47}
48
49make_udf_expr_and_func!(
50 ArraysZip,
51 arrays_zip,
52 "combines multiple arrays into a single array of structs.",
53 arrays_zip_udf
54);
55
56#[user_doc(
57 doc_section(label = "Array Functions"),
58 description = "Returns an array of structs created by combining the elements of each input array at the same index. If the arrays have different lengths, shorter arrays are padded with NULLs.",
59 syntax_example = "arrays_zip(array1, array2[, ..., array_n])",
60 sql_example = r#"```sql
61> select arrays_zip([1, 2, 3], ['a', 'b', 'c']);
62+---------------------------------------------------+
63| arrays_zip([1, 2, 3], ['a', 'b', 'c']) |
64+---------------------------------------------------+
65| [{c0: 1, c1: a}, {c0: 2, c1: b}, {c0: 3, c1: c}] |
66+---------------------------------------------------+
67> select arrays_zip([1, 2], [3, 4, 5]);
68+---------------------------------------------------+
69| arrays_zip([1, 2], [3, 4, 5]) |
70+---------------------------------------------------+
71| [{c0: 1, c1: 3}, {c0: 2, c1: 4}, {c0: , c1: 5}] |
72+---------------------------------------------------+
73```"#,
74 argument(name = "array1", description = "First array expression."),
75 argument(name = "array2", description = "Second array expression."),
76 argument(name = "array_n", description = "Subsequent array expressions.")
77)]
78#[derive(Debug, PartialEq, Eq, Hash)]
79pub struct ArraysZip {
80 signature: Signature,
81 aliases: Vec<String>,
82}
83
84impl Default for ArraysZip {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90impl ArraysZip {
91 pub fn new() -> Self {
92 Self {
93 signature: Signature::variadic_any(Volatility::Immutable),
94 aliases: vec![String::from("list_zip")],
95 }
96 }
97}
98
99impl ScalarUDFImpl for ArraysZip {
100 fn as_any(&self) -> &dyn Any {
101 self
102 }
103
104 fn name(&self) -> &str {
105 "arrays_zip"
106 }
107
108 fn signature(&self) -> &Signature {
109 &self.signature
110 }
111
112 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
113 if arg_types.is_empty() {
114 return exec_err!("arrays_zip requires at least two arguments");
115 }
116
117 let mut fields = Vec::with_capacity(arg_types.len());
118 for (i, arg_type) in arg_types.iter().enumerate() {
119 let element_type = match arg_type {
120 List(field) | LargeList(field) | FixedSizeList(field, _) => {
121 field.data_type().clone()
122 }
123 Null => Null,
124 dt => {
125 return exec_err!("arrays_zip expects array arguments, got {dt}");
126 }
127 };
128 fields.push(Field::new(format!("c{i}"), element_type, true));
129 }
130
131 Ok(List(Arc::new(Field::new_list_field(
132 DataType::Struct(Fields::from(fields)),
133 true,
134 ))))
135 }
136
137 fn invoke_with_args(
138 &self,
139 args: datafusion_expr::ScalarFunctionArgs,
140 ) -> Result<ColumnarValue> {
141 make_scalar_function(arrays_zip_inner)(&args.args)
142 }
143
144 fn aliases(&self) -> &[String] {
145 &self.aliases
146 }
147
148 fn documentation(&self) -> Option<&Documentation> {
149 self.doc()
150 }
151}
152
153fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
160 if args.len() < 2 {
161 return exec_err!("arrays_zip requires at least two arguments");
162 }
163
164 let num_rows = args[0].len();
165
166 let mut views: Vec<Option<ListColumnView>> = Vec::with_capacity(args.len());
169 let mut element_types: Vec<DataType> = Vec::with_capacity(args.len());
170
171 for (i, arg) in args.iter().enumerate() {
172 match arg.data_type() {
173 List(field) => {
174 let arr = as_list_array(arg)?;
175 let raw_offsets = arr.value_offsets();
176 let offsets: Vec<usize> =
177 raw_offsets.iter().map(|&o| o as usize).collect();
178 let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
179 element_types.push(field.data_type().clone());
180 views.push(Some(ListColumnView {
181 values: Arc::clone(arr.values()),
182 offsets,
183 is_null,
184 }));
185 }
186 LargeList(field) => {
187 let arr = as_large_list_array(arg)?;
188 let raw_offsets = arr.value_offsets();
189 let offsets: Vec<usize> =
190 raw_offsets.iter().map(|&o| o as usize).collect();
191 let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
192 element_types.push(field.data_type().clone());
193 views.push(Some(ListColumnView {
194 values: Arc::clone(arr.values()),
195 offsets,
196 is_null,
197 }));
198 }
199 FixedSizeList(field, size) => {
200 let arr = as_fixed_size_list_array(arg)?;
201 let size = *size as usize;
202 let offsets: Vec<usize> = (0..=num_rows).map(|row| row * size).collect();
203 let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
204 element_types.push(field.data_type().clone());
205 views.push(Some(ListColumnView {
206 values: Arc::clone(arr.values()),
207 offsets,
208 is_null,
209 }));
210 }
211 Null => {
212 element_types.push(Null);
213 views.push(None);
214 }
215 dt => {
216 return exec_err!("arrays_zip argument {i} expected list type, got {dt}");
217 }
218 }
219 }
220
221 let values_data: Vec<_> = views
223 .iter()
224 .map(|v| v.as_ref().map(|view| view.values.to_data()))
225 .collect();
226
227 let struct_fields: Fields = element_types
228 .iter()
229 .enumerate()
230 .map(|(i, dt)| Field::new(format!("c{i}"), dt.clone(), true))
231 .collect::<Vec<_>>()
232 .into();
233
234 let mut builders: Vec<Option<MutableArrayData>> = values_data
237 .iter()
238 .map(|vd| {
239 vd.as_ref().map(|data| {
240 MutableArrayData::with_capacities(vec![data], true, Capacities::Array(0))
241 })
242 })
243 .collect();
244
245 let mut offsets: Vec<i32> = Vec::with_capacity(num_rows + 1);
246 offsets.push(0);
247 let mut null_mask: Vec<bool> = Vec::with_capacity(num_rows);
248 let mut total_values: usize = 0;
249
250 for row_idx in 0..num_rows {
253 let mut max_len: usize = 0;
254 let mut all_null = true;
255
256 for view in views.iter().flatten() {
257 if !view.is_null[row_idx] {
258 all_null = false;
259 let len = view.offsets[row_idx + 1] - view.offsets[row_idx];
260 max_len = max_len.max(len);
261 }
262 }
263
264 if all_null {
265 null_mask.push(true);
266 offsets.push(*offsets.last().unwrap());
267 continue;
268 }
269 null_mask.push(false);
270
271 for (col_idx, view) in views.iter().enumerate() {
273 match view {
274 Some(v) if !v.is_null[row_idx] => {
275 let start = v.offsets[row_idx];
276 let end = v.offsets[row_idx + 1];
277 let len = end - start;
278 let builder = builders[col_idx].as_mut().unwrap();
279 builder.extend(0, start, end);
280 if len < max_len {
281 builder.extend_nulls(max_len - len);
282 }
283 }
284 _ => {
285 if let Some(builder) = builders[col_idx].as_mut() {
287 builder.extend_nulls(max_len);
288 }
289 }
290 }
291 }
292
293 total_values += max_len;
294 let last = *offsets.last().unwrap();
295 offsets.push(last + max_len as i32);
296 }
297
298 let struct_columns: Vec<ArrayRef> = builders
300 .into_iter()
301 .zip(element_types.iter())
302 .map(|(builder, elem_type)| match builder {
303 Some(b) => arrow::array::make_array(b.freeze()),
304 None => new_null_array(
305 if elem_type.is_null() {
306 &Null
307 } else {
308 elem_type
309 },
310 total_values,
311 ),
312 })
313 .collect();
314
315 let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?;
316
317 let null_buffer = if null_mask.iter().any(|&v| v) {
318 Some(NullBuffer::from(
319 null_mask.iter().map(|v| !v).collect::<Vec<bool>>(),
320 ))
321 } else {
322 None
323 };
324
325 let result = ListArray::try_new(
326 Arc::new(Field::new_list_field(
327 struct_array.data_type().clone(),
328 true,
329 )),
330 OffsetBuffer::new(offsets.into()),
331 Arc::new(struct_array),
332 null_buffer,
333 )?;
334
335 Ok(Arc::new(result))
336}