Skip to main content

datafusion_functions_nested/
arrays_zip.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for arrays_zip function.
19
20use crate::utils::make_scalar_function;
21use arrow::array::{
22    Array, ArrayRef, Capacities, ListArray, MutableArrayData, StructArray, new_null_array,
23};
24use arrow::buffer::{NullBuffer, OffsetBuffer};
25use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null};
26use arrow::datatypes::{DataType, Field, Fields};
27use datafusion_common::cast::{
28    as_fixed_size_list_array, as_large_list_array, as_list_array,
29};
30use datafusion_common::{Result, exec_err};
31use datafusion_expr::{
32    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
33};
34use datafusion_macros::user_doc;
35use std::any::Any;
36use std::sync::Arc;
37
38/// Type-erased view of a list column (works for both List and LargeList).
39/// Stores the information needed to iterate rows without re-downcasting.
40struct ListColumnView {
41    /// The flat values array backing this list column.
42    values: ArrayRef,
43    /// Pre-computed per-row start offsets (length = num_rows + 1).
44    offsets: Vec<usize>,
45    /// Pre-computed null bitmap: true means the row is null.
46    is_null: Vec<bool>,
47}
48
49make_udf_expr_and_func!(
50    ArraysZip,
51    arrays_zip,
52    "combines multiple arrays into a single array of structs.",
53    arrays_zip_udf
54);
55
56#[user_doc(
57    doc_section(label = "Array Functions"),
58    description = "Returns an array of structs created by combining the elements of each input array at the same index. If the arrays have different lengths, shorter arrays are padded with NULLs.",
59    syntax_example = "arrays_zip(array1, array2[, ..., array_n])",
60    sql_example = r#"```sql
61> select arrays_zip([1, 2, 3], ['a', 'b', 'c']);
62+---------------------------------------------------+
63| arrays_zip([1, 2, 3], ['a', 'b', 'c'])             |
64+---------------------------------------------------+
65| [{c0: 1, c1: a}, {c0: 2, c1: b}, {c0: 3, c1: c}] |
66+---------------------------------------------------+
67> select arrays_zip([1, 2], [3, 4, 5]);
68+---------------------------------------------------+
69| arrays_zip([1, 2], [3, 4, 5])                       |
70+---------------------------------------------------+
71| [{c0: 1, c1: 3}, {c0: 2, c1: 4}, {c0: , c1: 5}]  |
72+---------------------------------------------------+
73```"#,
74    argument(name = "array1", description = "First array expression."),
75    argument(name = "array2", description = "Second array expression."),
76    argument(name = "array_n", description = "Subsequent array expressions.")
77)]
78#[derive(Debug, PartialEq, Eq, Hash)]
79pub struct ArraysZip {
80    signature: Signature,
81    aliases: Vec<String>,
82}
83
84impl Default for ArraysZip {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90impl ArraysZip {
91    pub fn new() -> Self {
92        Self {
93            signature: Signature::variadic_any(Volatility::Immutable),
94            aliases: vec![String::from("list_zip")],
95        }
96    }
97}
98
99impl ScalarUDFImpl for ArraysZip {
100    fn as_any(&self) -> &dyn Any {
101        self
102    }
103
104    fn name(&self) -> &str {
105        "arrays_zip"
106    }
107
108    fn signature(&self) -> &Signature {
109        &self.signature
110    }
111
112    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
113        if arg_types.is_empty() {
114            return exec_err!("arrays_zip requires at least two arguments");
115        }
116
117        let mut fields = Vec::with_capacity(arg_types.len());
118        for (i, arg_type) in arg_types.iter().enumerate() {
119            let element_type = match arg_type {
120                List(field) | LargeList(field) | FixedSizeList(field, _) => {
121                    field.data_type().clone()
122                }
123                Null => Null,
124                dt => {
125                    return exec_err!("arrays_zip expects array arguments, got {dt}");
126                }
127            };
128            fields.push(Field::new(format!("c{i}"), element_type, true));
129        }
130
131        Ok(List(Arc::new(Field::new_list_field(
132            DataType::Struct(Fields::from(fields)),
133            true,
134        ))))
135    }
136
137    fn invoke_with_args(
138        &self,
139        args: datafusion_expr::ScalarFunctionArgs,
140    ) -> Result<ColumnarValue> {
141        make_scalar_function(arrays_zip_inner)(&args.args)
142    }
143
144    fn aliases(&self) -> &[String] {
145        &self.aliases
146    }
147
148    fn documentation(&self) -> Option<&Documentation> {
149        self.doc()
150    }
151}
152
153/// Core implementation for arrays_zip.
154///
155/// Takes N list arrays and produces a list of structs where each struct
156/// has one field per input array. If arrays within a row have different
157/// lengths, shorter arrays are padded with NULLs.
158/// Supports List, LargeList, and Null input types.
159fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
160    if args.len() < 2 {
161        return exec_err!("arrays_zip requires at least two arguments");
162    }
163
164    let num_rows = args[0].len();
165
166    // Build a type-erased ListColumnView for each argument.
167    // None means the argument is Null-typed (all nulls, no backing data).
168    let mut views: Vec<Option<ListColumnView>> = Vec::with_capacity(args.len());
169    let mut element_types: Vec<DataType> = Vec::with_capacity(args.len());
170
171    for (i, arg) in args.iter().enumerate() {
172        match arg.data_type() {
173            List(field) => {
174                let arr = as_list_array(arg)?;
175                let raw_offsets = arr.value_offsets();
176                let offsets: Vec<usize> =
177                    raw_offsets.iter().map(|&o| o as usize).collect();
178                let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
179                element_types.push(field.data_type().clone());
180                views.push(Some(ListColumnView {
181                    values: Arc::clone(arr.values()),
182                    offsets,
183                    is_null,
184                }));
185            }
186            LargeList(field) => {
187                let arr = as_large_list_array(arg)?;
188                let raw_offsets = arr.value_offsets();
189                let offsets: Vec<usize> =
190                    raw_offsets.iter().map(|&o| o as usize).collect();
191                let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
192                element_types.push(field.data_type().clone());
193                views.push(Some(ListColumnView {
194                    values: Arc::clone(arr.values()),
195                    offsets,
196                    is_null,
197                }));
198            }
199            FixedSizeList(field, size) => {
200                let arr = as_fixed_size_list_array(arg)?;
201                let size = *size as usize;
202                let offsets: Vec<usize> = (0..=num_rows).map(|row| row * size).collect();
203                let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
204                element_types.push(field.data_type().clone());
205                views.push(Some(ListColumnView {
206                    values: Arc::clone(arr.values()),
207                    offsets,
208                    is_null,
209                }));
210            }
211            Null => {
212                element_types.push(Null);
213                views.push(None);
214            }
215            dt => {
216                return exec_err!("arrays_zip argument {i} expected list type, got {dt}");
217            }
218        }
219    }
220
221    // Collect per-column values data for MutableArrayData builders.
222    let values_data: Vec<_> = views
223        .iter()
224        .map(|v| v.as_ref().map(|view| view.values.to_data()))
225        .collect();
226
227    let struct_fields: Fields = element_types
228        .iter()
229        .enumerate()
230        .map(|(i, dt)| Field::new(format!("c{i}"), dt.clone(), true))
231        .collect::<Vec<_>>()
232        .into();
233
234    // Create a MutableArrayData builder per column. For None (Null-typed)
235    // args we only need extend_nulls, so we track them separately.
236    let mut builders: Vec<Option<MutableArrayData>> = values_data
237        .iter()
238        .map(|vd| {
239            vd.as_ref().map(|data| {
240                MutableArrayData::with_capacities(vec![data], true, Capacities::Array(0))
241            })
242        })
243        .collect();
244
245    let mut offsets: Vec<i32> = Vec::with_capacity(num_rows + 1);
246    offsets.push(0);
247    let mut null_mask: Vec<bool> = Vec::with_capacity(num_rows);
248    let mut total_values: usize = 0;
249
250    // Process each row: compute per-array lengths, then copy values
251    // and pad shorter arrays with NULLs.
252    for row_idx in 0..num_rows {
253        let mut max_len: usize = 0;
254        let mut all_null = true;
255
256        for view in views.iter().flatten() {
257            if !view.is_null[row_idx] {
258                all_null = false;
259                let len = view.offsets[row_idx + 1] - view.offsets[row_idx];
260                max_len = max_len.max(len);
261            }
262        }
263
264        if all_null {
265            null_mask.push(true);
266            offsets.push(*offsets.last().unwrap());
267            continue;
268        }
269        null_mask.push(false);
270
271        // Extend each column builder for this row.
272        for (col_idx, view) in views.iter().enumerate() {
273            match view {
274                Some(v) if !v.is_null[row_idx] => {
275                    let start = v.offsets[row_idx];
276                    let end = v.offsets[row_idx + 1];
277                    let len = end - start;
278                    let builder = builders[col_idx].as_mut().unwrap();
279                    builder.extend(0, start, end);
280                    if len < max_len {
281                        builder.extend_nulls(max_len - len);
282                    }
283                }
284                _ => {
285                    // Null list entry or None (Null-typed) arg — all nulls.
286                    if let Some(builder) = builders[col_idx].as_mut() {
287                        builder.extend_nulls(max_len);
288                    }
289                }
290            }
291        }
292
293        total_values += max_len;
294        let last = *offsets.last().unwrap();
295        offsets.push(last + max_len as i32);
296    }
297
298    // Assemble struct columns from builders.
299    let struct_columns: Vec<ArrayRef> = builders
300        .into_iter()
301        .zip(element_types.iter())
302        .map(|(builder, elem_type)| match builder {
303            Some(b) => arrow::array::make_array(b.freeze()),
304            None => new_null_array(
305                if elem_type.is_null() {
306                    &Null
307                } else {
308                    elem_type
309                },
310                total_values,
311            ),
312        })
313        .collect();
314
315    let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?;
316
317    let null_buffer = if null_mask.iter().any(|&v| v) {
318        Some(NullBuffer::from(
319            null_mask.iter().map(|v| !v).collect::<Vec<bool>>(),
320        ))
321    } else {
322        None
323    };
324
325    let result = ListArray::try_new(
326        Arc::new(Field::new_list_field(
327            struct_array.data_type().clone(),
328            true,
329        )),
330        OffsetBuffer::new(offsets.into()),
331        Arc::new(struct_array),
332        null_buffer,
333    )?;
334
335    Ok(Arc::new(result))
336}