Skip to main content

datafusion_functions_nested/
arrays_zip.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ScalarUDFImpl`] definitions for arrays_zip function.
19
20use crate::utils::make_scalar_function;
21use arrow::array::{
22    Array, ArrayRef, Capacities, ListArray, MutableArrayData, NullBufferBuilder,
23    StructArray, new_null_array,
24};
25use arrow::buffer::OffsetBuffer;
26use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null};
27use arrow::datatypes::{DataType, Field, Fields};
28use datafusion_common::cast::{
29    as_fixed_size_list_array, as_large_list_array, as_list_array,
30};
31use datafusion_common::{Result, exec_err};
32use datafusion_expr::{
33    ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
34    Volatility,
35};
36use datafusion_macros::user_doc;
37use std::sync::Arc;
38
39/// Type-erased view of a list column (works for both List and LargeList).
40/// Stores the information needed to iterate rows without re-downcasting.
41struct ListColumnView {
42    /// The flat values array backing this list column.
43    values: ArrayRef,
44    /// Pre-computed per-row start offsets (length = num_rows + 1).
45    offsets: Vec<usize>,
46    /// Null bitmap from the input array (None means no nulls).
47    nulls: Option<arrow::buffer::NullBuffer>,
48}
49
50impl ListColumnView {
51    fn is_null(&self, idx: usize) -> bool {
52        self.nulls.as_ref().is_some_and(|n| n.is_null(idx))
53    }
54}
55
56make_udf_expr_and_func!(
57    ArraysZip,
58    arrays_zip,
59    "combines one or multiple arrays into a single array of structs.",
60    arrays_zip_udf
61);
62
63#[user_doc(
64    doc_section(label = "Array Functions"),
65    description = "Returns an array of structs created by combining the elements of each input array at the same index. If the arrays have different lengths, shorter arrays are padded with NULLs.",
66    syntax_example = "arrays_zip(array1[, ..., array_n])",
67    sql_example = r#"```sql
68> select arrays_zip([1, 2, 3]);
69+---------------------------------------------------+
70| arrays_zip([1, 2, 3])                             |
71+---------------------------------------------------+
72| [{1: 1}, {1: 2}, {1: 3}]                          |
73+---------------------------------------------------+
74> select arrays_zip([1, 2], [3, 4, 5]);
75+---------------------------------------------------+
76| arrays_zip([1, 2], [3, 4, 5])                     |
77+---------------------------------------------------+
78| [{1: 1, 2: 3}, {1: 2, 2: 4}, {1: NULL, 2: 5}]     |
79+---------------------------------------------------+
80```"#,
81    argument(name = "array1", description = "First array expression."),
82    argument(
83        name = "array_n",
84        description = "Optional additional array expressions."
85    )
86)]
87#[derive(Debug, PartialEq, Eq, Hash)]
88pub struct ArraysZip {
89    signature: Signature,
90    aliases: Vec<String>,
91}
92
93impl Default for ArraysZip {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99impl ArraysZip {
100    pub fn new() -> Self {
101        Self {
102            signature: Signature::variadic_any(Volatility::Immutable),
103            aliases: vec![String::from("list_zip")],
104        }
105    }
106}
107
108impl ScalarUDFImpl for ArraysZip {
109    fn name(&self) -> &str {
110        "arrays_zip"
111    }
112
113    fn signature(&self) -> &Signature {
114        &self.signature
115    }
116
117    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
118        if arg_types.is_empty() {
119            return exec_err!("arrays_zip requires at least one argument");
120        }
121
122        let mut fields = Vec::with_capacity(arg_types.len());
123        for (i, arg_type) in arg_types.iter().enumerate() {
124            let element_type = match arg_type {
125                List(field) | LargeList(field) | FixedSizeList(field, _) => {
126                    field.data_type().clone()
127                }
128                Null => Null,
129                dt => {
130                    return exec_err!("arrays_zip expects array arguments, got {dt}");
131                }
132            };
133            fields.push(Field::new(format!("{}", i + 1), element_type, true));
134        }
135
136        Ok(List(Arc::new(Field::new_list_field(
137            DataType::Struct(Fields::from(fields)),
138            true,
139        ))))
140    }
141
142    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
143        make_scalar_function(arrays_zip_inner)(&args.args)
144    }
145
146    fn aliases(&self) -> &[String] {
147        &self.aliases
148    }
149
150    fn documentation(&self) -> Option<&Documentation> {
151        self.doc()
152    }
153}
154
155/// Core implementation for arrays_zip.
156///
157/// Takes N list arrays and produces a list of structs where each struct
158/// has one field per input array. If arrays within a row have different
159/// lengths, shorter arrays are padded with NULLs.
160/// Supports List, LargeList, and Null input types.
161fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
162    if args.is_empty() {
163        return exec_err!("arrays_zip requires at least one argument");
164    }
165
166    let num_rows = args[0].len();
167
168    // Build a type-erased ListColumnView for each argument.
169    // None means the argument is Null-typed (all nulls, no backing data).
170    let mut views: Vec<Option<ListColumnView>> = Vec::with_capacity(args.len());
171    let mut element_types: Vec<DataType> = Vec::with_capacity(args.len());
172
173    for (i, arg) in args.iter().enumerate() {
174        match arg.data_type() {
175            List(field) => {
176                let arr = as_list_array(arg)?;
177                let raw_offsets = arr.value_offsets();
178                let offsets: Vec<usize> =
179                    raw_offsets.iter().map(|&o| o as usize).collect();
180                element_types.push(field.data_type().clone());
181                views.push(Some(ListColumnView {
182                    values: Arc::clone(arr.values()),
183                    offsets,
184                    nulls: arr.nulls().cloned(),
185                }));
186            }
187            LargeList(field) => {
188                let arr = as_large_list_array(arg)?;
189                let raw_offsets = arr.value_offsets();
190                let offsets: Vec<usize> =
191                    raw_offsets.iter().map(|&o| o as usize).collect();
192                element_types.push(field.data_type().clone());
193                views.push(Some(ListColumnView {
194                    values: Arc::clone(arr.values()),
195                    offsets,
196                    nulls: arr.nulls().cloned(),
197                }));
198            }
199            FixedSizeList(field, size) => {
200                let arr = as_fixed_size_list_array(arg)?;
201                let size = *size as usize;
202                let offsets: Vec<usize> = (0..=num_rows).map(|row| row * size).collect();
203                element_types.push(field.data_type().clone());
204                views.push(Some(ListColumnView {
205                    values: Arc::clone(arr.values()),
206                    offsets,
207                    nulls: arr.nulls().cloned(),
208                }));
209            }
210            Null => {
211                element_types.push(Null);
212                views.push(None);
213            }
214            dt => {
215                return exec_err!("arrays_zip argument {i} expected list type, got {dt}");
216            }
217        }
218    }
219
220    // Collect per-column values data for MutableArrayData builders.
221    let values_data: Vec<_> = views
222        .iter()
223        .map(|v| v.as_ref().map(|view| view.values.to_data()))
224        .collect();
225
226    let struct_fields: Fields = element_types
227        .iter()
228        .enumerate()
229        .map(|(i, dt)| Field::new(format!("{}", i + 1), dt.clone(), true))
230        .collect::<Vec<_>>()
231        .into();
232
233    // Create a MutableArrayData builder per column. For None (Null-typed)
234    // args we only need extend_nulls, so we track them separately.
235    let mut builders: Vec<Option<MutableArrayData>> = values_data
236        .iter()
237        .map(|vd| {
238            vd.as_ref().map(|data| {
239                MutableArrayData::with_capacities(vec![data], true, Capacities::Array(0))
240            })
241        })
242        .collect();
243
244    let mut offsets: Vec<i32> = Vec::with_capacity(num_rows + 1);
245    offsets.push(0);
246    let mut null_builder = NullBufferBuilder::new(num_rows);
247    let mut total_values: usize = 0;
248
249    // Process each row: compute per-array lengths, then copy values
250    // and pad shorter arrays with NULLs.
251    for row_idx in 0..num_rows {
252        let mut max_len: usize = 0;
253        let mut all_null = true;
254
255        for view in views.iter().flatten() {
256            if !view.is_null(row_idx) {
257                all_null = false;
258                let len = view.offsets[row_idx + 1] - view.offsets[row_idx];
259                max_len = max_len.max(len);
260            }
261        }
262
263        if all_null {
264            null_builder.append_null();
265            offsets.push(*offsets.last().unwrap());
266            continue;
267        }
268        null_builder.append_non_null();
269
270        // Extend each column builder for this row.
271        for (col_idx, view) in views.iter().enumerate() {
272            match view {
273                Some(v) if !v.is_null(row_idx) => {
274                    let start = v.offsets[row_idx];
275                    let end = v.offsets[row_idx + 1];
276                    let len = end - start;
277                    let builder = builders[col_idx].as_mut().unwrap();
278                    builder.extend(0, start, end);
279                    if len < max_len {
280                        builder.extend_nulls(max_len - len);
281                    }
282                }
283                _ => {
284                    // Null list entry or None (Null-typed) arg — all nulls.
285                    if let Some(builder) = builders[col_idx].as_mut() {
286                        builder.extend_nulls(max_len);
287                    }
288                }
289            }
290        }
291
292        total_values += max_len;
293        let last = *offsets.last().unwrap();
294        offsets.push(last + max_len as i32);
295    }
296
297    // Assemble struct columns from builders.
298    let struct_columns: Vec<ArrayRef> = builders
299        .into_iter()
300        .zip(element_types.iter())
301        .map(|(builder, elem_type)| match builder {
302            Some(b) => arrow::array::make_array(b.freeze()),
303            None => new_null_array(
304                if elem_type.is_null() {
305                    &Null
306                } else {
307                    elem_type
308                },
309                total_values,
310            ),
311        })
312        .collect();
313
314    let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?;
315
316    let null_buffer = null_builder.finish();
317
318    let result = ListArray::try_new(
319        Arc::new(Field::new_list_field(
320            struct_array.data_type().clone(),
321            true,
322        )),
323        OffsetBuffer::new(offsets.into()),
324        Arc::new(struct_array),
325        null_buffer,
326    )?;
327
328    Ok(Arc::new(result))
329}