datafusion_physical_plan/
unnest.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define a plan for unnesting values in columns that contain a list type.
19
20use std::cmp::{self, Ordering};
21use std::task::{ready, Poll};
22use std::{any::Any, sync::Arc};
23
24use super::metrics::{
25    self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
26    RecordOutput,
27};
28use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
29use crate::{
30    DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
31    SendableRecordBatchStream,
32};
33
34use arrow::array::{
35    new_null_array, Array, ArrayRef, AsArray, FixedSizeListArray, Int64Array,
36    LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray,
37};
38use arrow::compute::kernels::length::length;
39use arrow::compute::kernels::zip::zip;
40use arrow::compute::{cast, is_not_null, kernels, sum};
41use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
42use arrow::record_batch::RecordBatch;
43use arrow_ord::cmp::lt;
44use async_trait::async_trait;
45use datafusion_common::{
46    exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions,
47};
48use datafusion_execution::TaskContext;
49use datafusion_physical_expr::EquivalenceProperties;
50use futures::{Stream, StreamExt};
51use log::trace;
52
53/// Unnest the given columns (either with type struct or list)
54/// For list unnesting, each rows is vertically transformed into multiple rows
55/// For struct unnesting, each columns is horizontally transformed into multiple columns,
56/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
57///
58/// See [`UnnestOptions`] for more details and an example.
59#[derive(Debug, Clone)]
60pub struct UnnestExec {
61    /// Input execution plan
62    input: Arc<dyn ExecutionPlan>,
63    /// The schema once the unnest is applied
64    schema: SchemaRef,
65    /// Indices of the list-typed columns in the input schema
66    list_column_indices: Vec<ListUnnest>,
67    /// Indices of the struct-typed columns in the input schema
68    struct_column_indices: Vec<usize>,
69    /// Options
70    options: UnnestOptions,
71    /// Execution metrics
72    metrics: ExecutionPlanMetricsSet,
73    /// Cache holding plan properties like equivalences, output partitioning etc.
74    cache: PlanProperties,
75}
76
77impl UnnestExec {
78    /// Create a new [UnnestExec].
79    pub fn new(
80        input: Arc<dyn ExecutionPlan>,
81        list_column_indices: Vec<ListUnnest>,
82        struct_column_indices: Vec<usize>,
83        schema: SchemaRef,
84        options: UnnestOptions,
85    ) -> Self {
86        let cache = Self::compute_properties(&input, Arc::clone(&schema));
87
88        UnnestExec {
89            input,
90            schema,
91            list_column_indices,
92            struct_column_indices,
93            options,
94            metrics: Default::default(),
95            cache,
96        }
97    }
98
99    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
100    fn compute_properties(
101        input: &Arc<dyn ExecutionPlan>,
102        schema: SchemaRef,
103    ) -> PlanProperties {
104        PlanProperties::new(
105            EquivalenceProperties::new(schema),
106            input.output_partitioning().to_owned(),
107            input.pipeline_behavior(),
108            input.boundedness(),
109        )
110    }
111
112    /// Input execution plan
113    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
114        &self.input
115    }
116
117    /// Indices of the list-typed columns in the input schema
118    pub fn list_column_indices(&self) -> &[ListUnnest] {
119        &self.list_column_indices
120    }
121
122    /// Indices of the struct-typed columns in the input schema
123    pub fn struct_column_indices(&self) -> &[usize] {
124        &self.struct_column_indices
125    }
126
127    pub fn options(&self) -> &UnnestOptions {
128        &self.options
129    }
130}
131
132impl DisplayAs for UnnestExec {
133    fn fmt_as(
134        &self,
135        t: DisplayFormatType,
136        f: &mut std::fmt::Formatter,
137    ) -> std::fmt::Result {
138        match t {
139            DisplayFormatType::Default | DisplayFormatType::Verbose => {
140                write!(f, "UnnestExec")
141            }
142            DisplayFormatType::TreeRender => {
143                write!(f, "")
144            }
145        }
146    }
147}
148
149impl ExecutionPlan for UnnestExec {
150    fn name(&self) -> &'static str {
151        "UnnestExec"
152    }
153
154    fn as_any(&self) -> &dyn Any {
155        self
156    }
157
158    fn properties(&self) -> &PlanProperties {
159        &self.cache
160    }
161
162    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
163        vec![&self.input]
164    }
165
166    fn with_new_children(
167        self: Arc<Self>,
168        children: Vec<Arc<dyn ExecutionPlan>>,
169    ) -> Result<Arc<dyn ExecutionPlan>> {
170        Ok(Arc::new(UnnestExec::new(
171            Arc::clone(&children[0]),
172            self.list_column_indices.clone(),
173            self.struct_column_indices.clone(),
174            Arc::clone(&self.schema),
175            self.options.clone(),
176        )))
177    }
178
179    fn required_input_distribution(&self) -> Vec<Distribution> {
180        vec![Distribution::UnspecifiedDistribution]
181    }
182
183    fn execute(
184        &self,
185        partition: usize,
186        context: Arc<TaskContext>,
187    ) -> Result<SendableRecordBatchStream> {
188        let input = self.input.execute(partition, context)?;
189        let metrics = UnnestMetrics::new(partition, &self.metrics);
190
191        Ok(Box::pin(UnnestStream {
192            input,
193            schema: Arc::clone(&self.schema),
194            list_type_columns: self.list_column_indices.clone(),
195            struct_column_indices: self.struct_column_indices.iter().copied().collect(),
196            options: self.options.clone(),
197            metrics,
198        }))
199    }
200
201    fn metrics(&self) -> Option<MetricsSet> {
202        Some(self.metrics.clone_inner())
203    }
204}
205
206#[derive(Clone, Debug)]
207struct UnnestMetrics {
208    /// Execution metrics
209    baseline_metrics: BaselineMetrics,
210    /// Number of batches consumed
211    input_batches: metrics::Count,
212    /// Number of rows consumed
213    input_rows: metrics::Count,
214    /// Number of batches produced
215    output_batches: metrics::Count,
216}
217
218impl UnnestMetrics {
219    fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
220        let input_batches =
221            MetricBuilder::new(metrics).counter("input_batches", partition);
222
223        let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
224
225        let output_batches =
226            MetricBuilder::new(metrics).counter("output_batches", partition);
227
228        Self {
229            baseline_metrics: BaselineMetrics::new(metrics, partition),
230            input_batches,
231            input_rows,
232            output_batches,
233        }
234    }
235}
236
237/// A stream that issues [RecordBatch]es with unnested column data.
238struct UnnestStream {
239    /// Input stream
240    input: SendableRecordBatchStream,
241    /// Unnested schema
242    schema: Arc<Schema>,
243    /// represents all unnest operations to be applied to the input (input index, depth)
244    /// e.g unnest(col1),unnest(unnest(col1)) where col1 has index 1 in original input schema
245    /// then list_type_columns = [ListUnnest{1,1},ListUnnest{1,2}]
246    list_type_columns: Vec<ListUnnest>,
247    struct_column_indices: HashSet<usize>,
248    /// Options
249    options: UnnestOptions,
250    /// Metrics
251    metrics: UnnestMetrics,
252}
253
254impl RecordBatchStream for UnnestStream {
255    fn schema(&self) -> SchemaRef {
256        Arc::clone(&self.schema)
257    }
258}
259
260#[async_trait]
261impl Stream for UnnestStream {
262    type Item = Result<RecordBatch>;
263
264    fn poll_next(
265        mut self: std::pin::Pin<&mut Self>,
266        cx: &mut std::task::Context<'_>,
267    ) -> Poll<Option<Self::Item>> {
268        self.poll_next_impl(cx)
269    }
270}
271
272impl UnnestStream {
273    /// Separate implementation function that unpins the [`UnnestStream`] so
274    /// that partial borrows work correctly
275    fn poll_next_impl(
276        &mut self,
277        cx: &mut std::task::Context<'_>,
278    ) -> Poll<Option<Result<RecordBatch>>> {
279        loop {
280            return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
281                Some(Ok(batch)) => {
282                    let elapsed_compute =
283                        self.metrics.baseline_metrics.elapsed_compute().clone();
284                    let timer = elapsed_compute.timer();
285                    self.metrics.input_batches.add(1);
286                    self.metrics.input_rows.add(batch.num_rows());
287                    let result = build_batch(
288                        &batch,
289                        &self.schema,
290                        &self.list_type_columns,
291                        &self.struct_column_indices,
292                        &self.options,
293                    )?;
294                    timer.done();
295                    let Some(result_batch) = result else {
296                        continue;
297                    };
298                    self.metrics.output_batches.add(1);
299                    (&result_batch).record_output(&self.metrics.baseline_metrics);
300
301                    // Empty record batches should not be emitted.
302                    // They need to be treated as  [`Option<RecordBatch>`]es and handled separately
303                    debug_assert!(result_batch.num_rows() > 0);
304                    Some(Ok(result_batch))
305                }
306                other => {
307                    trace!(
308                        "Processed {} probe-side input batches containing {} rows and \
309                        produced {} output batches containing {} rows in {}",
310                        self.metrics.input_batches,
311                        self.metrics.input_rows,
312                        self.metrics.output_batches,
313                        self.metrics.baseline_metrics.output_rows(),
314                        self.metrics.baseline_metrics.elapsed_compute(),
315                    );
316                    other
317                }
318            });
319        }
320    }
321}
322
323/// Given a set of struct column indices to flatten
324/// try converting the column in input into multiple subfield columns
325/// For example
326/// struct_col: [a: struct(item: int, name: string), b: int]
327/// with a batch
328/// {a: {item: 1, name: "a"}, b: 2},
329/// {a: {item: 3, name: "b"}, b: 4]
330/// will be converted into
331/// {a.item: 1, a.name: "a", b: 2},
332/// {a.item: 3, a.name: "b", b: 4}
333fn flatten_struct_cols(
334    input_batch: &[Arc<dyn Array>],
335    schema: &SchemaRef,
336    struct_column_indices: &HashSet<usize>,
337) -> Result<RecordBatch> {
338    // horizontal expansion because of struct unnest
339    let columns_expanded = input_batch
340        .iter()
341        .enumerate()
342        .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
343            Some(_) => match column_data.data_type() {
344                DataType::Struct(_) => {
345                    let struct_arr =
346                        column_data.as_any().downcast_ref::<StructArray>().unwrap();
347                    Ok(struct_arr.columns().to_vec())
348                }
349                data_type => internal_err!(
350                    "expecting column {} from input plan to be a struct, got {:?}",
351                    idx,
352                    data_type
353                ),
354            },
355            None => Ok(vec![Arc::clone(column_data)]),
356        })
357        .collect::<Result<Vec<_>>>()?
358        .into_iter()
359        .flatten()
360        .collect();
361    Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
362}
363
364#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
365pub struct ListUnnest {
366    pub index_in_input_schema: usize,
367    pub depth: usize,
368}
369
370/// This function is used to execute the unnesting on multiple columns all at once, but
371/// one level at a time, and is called n times, where n is the highest recursion level among
372/// the unnest exprs in the query.
373///
374/// For example giving the following query:
375/// ```sql
376/// select unnest(colA, max_depth:=3) as P1, unnest(colA,max_depth:=2) as P2, unnest(colB, max_depth:=1) as P3 from temp;
377/// ```
378/// Then the total times this function being called is 3
379///
380/// It needs to be aware of which level the current unnesting is, because if there exists
381/// multiple unnesting on the same column, but with different recursion levels, say
382/// **unnest(colA, max_depth:=3)** and **unnest(colA, max_depth:=2)**, then the unnesting
383/// of expr **unnest(colA, max_depth:=3)** will start at level 3, while unnesting for expr
384/// **unnest(colA, max_depth:=2)** has to start at level 2
385///
386/// Set *colA* as a 3-dimension columns and *colB* as an array (1-dimension). As stated,
387/// this function is called with the descending order of recursion depth
388///
389/// Depth = 3
390/// - colA(3-dimension) unnest into temp column temp_P1(2_dimension) (unnesting of P1 starts
391///   from this level)
392/// - colA(3-dimension) having indices repeated by the unnesting operation above
393/// - colB(1-dimension) having indices repeated by the unnesting operation above
394///
395/// Depth = 2
396/// - temp_P1(2-dimension) unnest into temp column temp_P1(1-dimension)
397/// - colA(3-dimension) unnest into temp column temp_P2(2-dimension) (unnesting of P2 starts
398///   from this level)
399/// - colB(1-dimension) having indices repeated by the unnesting operation above
400///
401/// Depth = 1
402/// - temp_P1(1-dimension) unnest into P1
403/// - temp_P2(2-dimension) unnest into P2
404/// - colB(1-dimension) unnest into P3 (unnesting of P3 starts from this level)
405///
406/// The returned array will has the same size as the input batch
407/// and only contains original columns that are not being unnested.
408fn list_unnest_at_level(
409    batch: &[ArrayRef],
410    list_type_unnests: &[ListUnnest],
411    temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
412    level_to_unnest: usize,
413    options: &UnnestOptions,
414) -> Result<Option<Vec<ArrayRef>>> {
415    // Extract unnestable columns at this level
416    let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
417        list_type_unnests
418            .iter()
419            .filter_map(|unnesting| {
420                if level_to_unnest == unnesting.depth {
421                    return Some((
422                        Arc::clone(&batch[unnesting.index_in_input_schema]),
423                        *unnesting,
424                    ));
425                }
426                // This means the unnesting on this item has started at higher level
427                // and need to continue until depth reaches 1
428                if level_to_unnest < unnesting.depth {
429                    return Some((
430                        Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
431                        *unnesting,
432                    ));
433                }
434                None
435            })
436            .unzip();
437
438    // Filter out so that list_arrays only contain column with the highest depth
439    // at the same time, during iteration remove this depth so next time we don't have to unnest them again
440    let longest_length = find_longest_length(&arrs_to_unnest, options)?;
441    let unnested_length = longest_length.as_primitive::<Int64Type>();
442    let total_length = if unnested_length.is_empty() {
443        0
444    } else {
445        sum(unnested_length).ok_or_else(|| {
446            exec_datafusion_err!("Failed to calculate the total unnested length")
447        })? as usize
448    };
449    if total_length == 0 {
450        return Ok(None);
451    }
452
453    // Unnest all the list arrays
454    let unnested_temp_arrays =
455        unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
456
457    // Create the take indices array for other columns
458    let take_indices = create_take_indices(unnested_length, total_length);
459    unnested_temp_arrays
460        .into_iter()
461        .zip(list_unnest_specs.iter())
462        .for_each(|(flatten_arr, unnesting)| {
463            temp_unnested_arrs.insert(*unnesting, flatten_arr);
464        });
465
466    let repeat_mask: Vec<bool> = batch
467        .iter()
468        .enumerate()
469        .map(|(i, _)| {
470            // Check if the column is needed in future levels (levels below the current one)
471            let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
472                unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
473            });
474
475            // Check if the column is involved in unnesting at any level
476            let is_involved_in_unnesting = list_type_unnests
477                .iter()
478                .any(|unnesting| unnesting.index_in_input_schema == i);
479
480            // Repeat columns needed in future levels or not unnested.
481            needed_in_future_levels || !is_involved_in_unnesting
482        })
483        .collect();
484
485    // Dimension of arrays in batch is untouched, but the values are repeated
486    // as the side effect of unnesting
487    let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
488
489    Ok(Some(ret))
490}
491struct UnnestingResult {
492    arr: ArrayRef,
493    depth: usize,
494}
495
496/// For each row in a `RecordBatch`, some list/struct columns need to be unnested.
497/// - For list columns: We will expand the values in each list into multiple rows,
498///   taking the longest length among these lists, and shorter lists are padded with NULLs.
499/// - For struct columns: We will expand the struct columns into multiple subfield columns.
500///
501/// For columns that don't need to be unnested, repeat their values until reaching the longest length.
502///
503/// Note: unnest has a big difference in behavior between Postgres and DuckDB
504///
505/// Take this example
506///
507/// 1. Postgres
508/// ```ignored
509/// create table temp (
510///     i integer[][][], j integer[]
511/// )
512/// insert into temp values ('{{{1,2},{3,4}},{{5,6},{7,8}}}', '{1,2}');
513/// select unnest(i), unnest(j) from temp;
514/// ```
515///
516/// Result
517/// ```text
518///     1   1
519///     2   2
520///     3
521///     4
522///     5
523///     6
524///     7
525///     8
526/// ```
527/// 2. DuckDB
528/// ```ignore
529///     create table temp (i integer[][][], j integer[]);
530///     insert into temp values ([[[1,2],[3,4]],[[5,6],[7,8]]], [1,2]);
531///     select unnest(i,recursive:=true), unnest(j,recursive:=true) from temp;
532/// ```
533/// Result:
534/// ```text
535///
536///     ┌────────────────────────────────────────────────┬────────────────────────────────────────────────┐
537///     │ unnest(i, "recursive" := CAST('t' AS BOOLEAN)) │ unnest(j, "recursive" := CAST('t' AS BOOLEAN)) │
538///     │                     int32                      │                     int32                      │
539///     ├────────────────────────────────────────────────┼────────────────────────────────────────────────┤
540///     │                                              1 │                                              1 │
541///     │                                              2 │                                              2 │
542///     │                                              3 │                                              1 │
543///     │                                              4 │                                              2 │
544///     │                                              5 │                                              1 │
545///     │                                              6 │                                              2 │
546///     │                                              7 │                                              1 │
547///     │                                              8 │                                              2 │
548///     └────────────────────────────────────────────────┴────────────────────────────────────────────────┘
549/// ```
550///
551/// The following implementation refer to DuckDB's implementation
552fn build_batch(
553    batch: &RecordBatch,
554    schema: &SchemaRef,
555    list_type_columns: &[ListUnnest],
556    struct_column_indices: &HashSet<usize>,
557    options: &UnnestOptions,
558) -> Result<Option<RecordBatch>> {
559    let transformed = match list_type_columns.len() {
560        0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
561        _ => {
562            let mut temp_unnested_result = HashMap::new();
563            let max_recursion = list_type_columns
564                .iter()
565                .fold(0, |highest_depth, ListUnnest { depth, .. }| {
566                    cmp::max(highest_depth, *depth)
567                });
568
569            // This arr always has the same column count with the input batch
570            let mut flatten_arrs = vec![];
571
572            // Original batch has the same columns
573            // All unnesting results are written to temp_batch
574            for depth in (1..=max_recursion).rev() {
575                let input = match depth == max_recursion {
576                    true => batch.columns(),
577                    false => &flatten_arrs,
578                };
579                let Some(temp_result) = list_unnest_at_level(
580                    input,
581                    list_type_columns,
582                    &mut temp_unnested_result,
583                    depth,
584                    options,
585                )?
586                else {
587                    return Ok(None);
588                };
589                flatten_arrs = temp_result;
590            }
591            let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
592                temp_unnested_result.into_iter().fold(
593                    HashMap::new(),
594                    |mut acc,
595                     (
596                        ListUnnest {
597                            index_in_input_schema,
598                            depth,
599                        },
600                        flattened_array,
601                    )| {
602                        acc.entry(index_in_input_schema).or_default().push(
603                            UnnestingResult {
604                                arr: flattened_array,
605                                depth,
606                            },
607                        );
608                        acc
609                    },
610                );
611            let output_order: HashMap<ListUnnest, usize> = list_type_columns
612                .iter()
613                .enumerate()
614                .map(|(order, unnest_def)| (*unnest_def, order))
615                .collect();
616
617            // One original column may be unnested multiple times into separate columns
618            let mut multi_unnested_per_original_index = unnested_array_map
619                .into_iter()
620                .map(
621                    // Each item in unnested_columns is the result of unnesting the same input column
622                    // we need to sort them to conform with the original expression order
623                    // e.g unnest(unnest(col)) must goes before unnest(col)
624                    |(original_index, mut unnested_columns)| {
625                        unnested_columns.sort_by(
626                            |UnnestingResult { depth: depth1, .. },
627                             UnnestingResult { depth: depth2, .. }|
628                             -> Ordering {
629                                output_order
630                                    .get(&ListUnnest {
631                                        depth: *depth1,
632                                        index_in_input_schema: original_index,
633                                    })
634                                    .unwrap()
635                                    .cmp(
636                                        output_order
637                                            .get(&ListUnnest {
638                                                depth: *depth2,
639                                                index_in_input_schema: original_index,
640                                            })
641                                            .unwrap(),
642                                    )
643                            },
644                        );
645                        (
646                            original_index,
647                            unnested_columns
648                                .into_iter()
649                                .map(|result| result.arr)
650                                .collect::<Vec<_>>(),
651                        )
652                    },
653                )
654                .collect::<HashMap<_, _>>();
655
656            let ret = flatten_arrs
657                .into_iter()
658                .enumerate()
659                .flat_map(|(col_idx, arr)| {
660                    // Convert original column into its unnested version(s)
661                    // Plural because one column can be unnested with different recursion level
662                    // and into separate output columns
663                    match multi_unnested_per_original_index.remove(&col_idx) {
664                        Some(unnested_arrays) => unnested_arrays,
665                        None => vec![arr],
666                    }
667                })
668                .collect::<Vec<_>>();
669
670            flatten_struct_cols(&ret, schema, struct_column_indices)
671        }
672    }?;
673    Ok(Some(transformed))
674}
675
676/// Find the longest list length among the given list arrays for each row.
677///
678/// For example if we have the following two list arrays:
679///
680/// ```ignore
681/// l1: [1, 2, 3], null, [], [3]
682/// l2: [4,5], [], null, [6, 7]
683/// ```
684///
685/// If `preserve_nulls` is false, the longest length array will be:
686///
687/// ```ignore
688/// longest_length: [3, 0, 0, 2]
689/// ```
690///
691/// whereas if `preserve_nulls` is true, the longest length array will be:
692///
693///
694/// ```ignore
695/// longest_length: [3, 1, 1, 2]
696/// ```
697///
698fn find_longest_length(
699    list_arrays: &[ArrayRef],
700    options: &UnnestOptions,
701) -> Result<ArrayRef> {
702    // The length of a NULL list
703    let null_length = if options.preserve_nulls {
704        Scalar::new(Int64Array::from_value(1, 1))
705    } else {
706        Scalar::new(Int64Array::from_value(0, 1))
707    };
708    let list_lengths: Vec<ArrayRef> = list_arrays
709        .iter()
710        .map(|list_array| {
711            let mut length_array = length(list_array)?;
712            // Make sure length arrays have the same type. Int64 is the most general one.
713            length_array = cast(&length_array, &DataType::Int64)?;
714            length_array =
715                zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
716            Ok(length_array)
717        })
718        .collect::<Result<_>>()?;
719
720    let longest_length = list_lengths.iter().skip(1).try_fold(
721        Arc::clone(&list_lengths[0]),
722        |longest, current| {
723            let is_lt = lt(&longest, &current)?;
724            zip(&is_lt, &current, &longest)
725        },
726    )?;
727    Ok(longest_length)
728}
729
730/// Trait defining common methods used for unnesting, implemented by list array types.
731trait ListArrayType: Array {
732    /// Returns a reference to the values of this list.
733    fn values(&self) -> &ArrayRef;
734
735    /// Returns the start and end offset of the values for the given row.
736    fn value_offsets(&self, row: usize) -> (i64, i64);
737}
738
739impl ListArrayType for ListArray {
740    fn values(&self) -> &ArrayRef {
741        self.values()
742    }
743
744    fn value_offsets(&self, row: usize) -> (i64, i64) {
745        let offsets = self.value_offsets();
746        (offsets[row].into(), offsets[row + 1].into())
747    }
748}
749
750impl ListArrayType for LargeListArray {
751    fn values(&self) -> &ArrayRef {
752        self.values()
753    }
754
755    fn value_offsets(&self, row: usize) -> (i64, i64) {
756        let offsets = self.value_offsets();
757        (offsets[row], offsets[row + 1])
758    }
759}
760
761impl ListArrayType for FixedSizeListArray {
762    fn values(&self) -> &ArrayRef {
763        self.values()
764    }
765
766    fn value_offsets(&self, row: usize) -> (i64, i64) {
767        let start = self.value_offset(row) as i64;
768        (start, start + self.value_length() as i64)
769    }
770}
771
772/// Unnest multiple list arrays according to the length array.
773fn unnest_list_arrays(
774    list_arrays: &[ArrayRef],
775    length_array: &PrimitiveArray<Int64Type>,
776    capacity: usize,
777) -> Result<Vec<ArrayRef>> {
778    let typed_arrays = list_arrays
779        .iter()
780        .map(|list_array| match list_array.data_type() {
781            DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
782            DataType::LargeList(_) => {
783                Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
784            }
785            DataType::FixedSizeList(_, _) => {
786                Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
787            }
788            other => exec_err!("Invalid unnest datatype {other }"),
789        })
790        .collect::<Result<Vec<_>>>()?;
791
792    typed_arrays
793        .iter()
794        .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
795        .collect::<Result<_>>()
796}
797
798/// Unnest a list array according the target length array.
799///
800/// Consider a list array like this:
801///
802/// ```ignore
803/// [1], [2, 3, 4], null, [5], [],
804/// ```
805///
806/// and the length array is:
807///
808/// ```ignore
809/// [2, 3, 2, 1, 2]
810/// ```
811///
812/// If the length of a certain list is less than the target length, pad with NULLs.
813/// So the unnested array will look like this:
814///
815/// ```ignore
816/// [1, null, 2, 3, 4, null, null, 5, null, null]
817/// ```
818///
819fn unnest_list_array(
820    list_array: &dyn ListArrayType,
821    length_array: &PrimitiveArray<Int64Type>,
822    capacity: usize,
823) -> Result<ArrayRef> {
824    let values = list_array.values();
825    let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
826    for row in 0..list_array.len() {
827        let mut value_length = 0;
828        if !list_array.is_null(row) {
829            let (start, end) = list_array.value_offsets(row);
830            value_length = end - start;
831            for i in start..end {
832                take_indices_builder.append_value(i)
833            }
834        }
835        let target_length = length_array.value(row);
836        debug_assert!(
837            value_length <= target_length,
838            "value length is beyond the longest length"
839        );
840        // Pad with NULL values
841        for _ in value_length..target_length {
842            take_indices_builder.append_null();
843        }
844    }
845    Ok(kernels::take::take(
846        &values,
847        &take_indices_builder.finish(),
848        None,
849    )?)
850}
851
852/// Creates take indices that will be used to expand all columns except for the list type
853/// [`columns`](UnnestExec::list_column_indices) that is being unnested.
854/// Every column value needs to be repeated multiple times according to the length array.
855///
856/// If the length array looks like this:
857///
858/// ```ignore
859/// [2, 3, 1]
860/// ```
861/// Then [`create_take_indices`] will return an array like this
862///
863/// ```ignore
864/// [0, 0, 1, 1, 1, 2]
865/// ```
866///
867fn create_take_indices(
868    length_array: &PrimitiveArray<Int64Type>,
869    capacity: usize,
870) -> PrimitiveArray<Int64Type> {
871    // `find_longest_length()` guarantees this.
872    debug_assert!(
873        length_array.null_count() == 0,
874        "length array should not contain nulls"
875    );
876    let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
877    for (index, repeat) in length_array.iter().enumerate() {
878        // The length array should not contain nulls, so unwrap is safe
879        let repeat = repeat.unwrap();
880        (0..repeat).for_each(|_| builder.append_value(index as i64));
881    }
882    builder.finish()
883}
884
885/// Create a batch of arrays based on an input `batch` and a `indices` array.
886/// The `indices` array is used by the take kernel to repeat values in the arrays
887/// that are marked with `true` in the `repeat_mask`. Arrays marked with `false`
888/// in the `repeat_mask` will be replaced with arrays filled with nulls of the
889/// appropriate length.
890///
891/// For example if we have the following batch:
892///
893/// ```ignore
894/// c1: [1], null, [2, 3, 4], null, [5, 6]
895/// c2: 'a', 'b',  'c', null, 'd'
896/// ```
897///
898/// then the `unnested_list_arrays` contains the unnest column that will replace `c1` in
899/// the final batch if `preserve_nulls` is true:
900///
901/// ```ignore
902/// c1: 1, null, 2, 3, 4, null, 5, 6
903/// ```
904///
905/// And the `indices` array contains the indices that are used by `take` kernel to
906/// repeat the values in `c2`:
907///
908/// ```ignore
909/// 0, 1, 2, 2, 2, 3, 4, 4
910/// ```
911///
912/// so that the final batch will look like:
913///
914/// ```ignore
915/// c1: 1, null, 2, 3, 4, null, 5, 6
916/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
917/// ```
918///
919/// The `repeat_mask` determines whether an array's values are repeated or replaced with nulls.
920/// For example, if the `repeat_mask` is:
921///
922/// ```ignore
923/// [true, false]
924/// ```
925///
926/// The final batch will look like:
927///
928/// ```ignore
929/// c1: 1, null, 2, 3, 4, null, 5, 6  // Repeated using `indices`
930/// c2: null, null, null, null, null, null, null, null  // Replaced with nulls
931///
932fn repeat_arrs_from_indices(
933    batch: &[ArrayRef],
934    indices: &PrimitiveArray<Int64Type>,
935    repeat_mask: &[bool],
936) -> Result<Vec<Arc<dyn Array>>> {
937    batch
938        .iter()
939        .zip(repeat_mask.iter())
940        .map(|(arr, &repeat)| {
941            if repeat {
942                Ok(kernels::take::take(arr, indices, None)?)
943            } else {
944                Ok(new_null_array(arr.data_type(), arr.len()))
945            }
946        })
947        .collect()
948}
949
950#[cfg(test)]
951mod tests {
952    use super::*;
953    use arrow::array::{
954        GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
955    };
956    use arrow::buffer::{NullBuffer, OffsetBuffer};
957    use arrow::datatypes::{Field, Int32Type};
958    use datafusion_common::test_util::batches_to_string;
959    use insta::assert_snapshot;
960
961    // Create a GenericListArray with the following list values:
962    //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
963    fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
964    where
965        OffsetSize: OffsetSizeTrait,
966    {
967        let mut values = vec![];
968        let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
969        let mut valid = NullBufferBuilder::new(6);
970
971        // [A, B, C]
972        values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
973        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
974        valid.append_non_null();
975
976        // []
977        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
978        valid.append_non_null();
979
980        // NULL with non-zero value length
981        // Issue https://github.com/apache/datafusion/issues/9932
982        values.push(Some("?"));
983        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
984        valid.append_null();
985
986        // [D]
987        values.push(Some("D"));
988        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
989        valid.append_non_null();
990
991        // Another NULL with zero value length
992        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
993        valid.append_null();
994
995        // [NULL, F]
996        values.extend_from_slice(&[None, Some("F")]);
997        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
998        valid.append_non_null();
999
1000        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1001        GenericListArray::<OffsetSize>::new(
1002            field,
1003            OffsetBuffer::new(offsets.into()),
1004            Arc::new(StringArray::from(values)),
1005            valid.finish(),
1006        )
1007    }
1008
1009    // Create a FixedSizeListArray with the following list values:
1010    //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1011    fn make_fixed_list() -> FixedSizeListArray {
1012        let values = Arc::new(StringArray::from_iter([
1013            Some("A"),
1014            Some("B"),
1015            None,
1016            None,
1017            Some("C"),
1018            Some("D"),
1019            None,
1020            None,
1021            None,
1022            Some("F"),
1023            None,
1024            None,
1025        ]));
1026        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1027        let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1028        FixedSizeListArray::new(field, 2, values, Some(valid))
1029    }
1030
1031    fn verify_unnest_list_array(
1032        list_array: &dyn ListArrayType,
1033        lengths: Vec<i64>,
1034        expected: Vec<Option<&str>>,
1035    ) -> Result<()> {
1036        let length_array = Int64Array::from(lengths);
1037        let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1038        let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1039        assert_eq!(strs, expected);
1040        Ok(())
1041    }
1042
1043    #[test]
1044    fn test_build_batch_list_arr_recursive() -> Result<()> {
1045        // col1                             | col2
1046        // [[1,2,3],null,[4,5]]             | ['a','b']
1047        // [[7,8,9,10], null, [11,12,13]]   | ['c','d']
1048        // null                             | ['e']
1049        let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1050            Some(vec![Some(1), Some(2), Some(3)]),
1051            None,
1052            Some(vec![Some(4), Some(5)]),
1053            Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1054            None,
1055            Some(vec![Some(11), Some(12), Some(13)]),
1056        ]);
1057
1058        let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1059        let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1060        let mut nulls = NullBufferBuilder::new(3);
1061        nulls.append_non_null();
1062        nulls.append_non_null();
1063        nulls.append_null();
1064        // list<list<int32>>
1065        let col1_field = Field::new_list_field(
1066            DataType::List(Arc::new(Field::new_list_field(
1067                list_arr1_ref.data_type().to_owned(),
1068                true,
1069            ))),
1070            true,
1071        );
1072        let col1 = ListArray::new(
1073            Arc::new(Field::new_list_field(
1074                list_arr1_ref.data_type().to_owned(),
1075                true,
1076            )),
1077            offsets,
1078            list_arr1_ref,
1079            nulls.finish(),
1080        );
1081
1082        let list_arr2 = StringArray::from(vec![
1083            Some("a"),
1084            Some("b"),
1085            Some("c"),
1086            Some("d"),
1087            Some("e"),
1088        ]);
1089
1090        let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1091        let mut nulls = NullBufferBuilder::new(3);
1092        nulls.append_n_non_nulls(3);
1093        let col2_field = Field::new(
1094            "col2",
1095            DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1096            true,
1097        );
1098        let col2 = GenericListArray::<i32>::new(
1099            Arc::new(Field::new_list_field(DataType::Utf8, true)),
1100            OffsetBuffer::new(offsets.into()),
1101            Arc::new(list_arr2),
1102            nulls.finish(),
1103        );
1104        // convert col1 and col2 to a record batch
1105        let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1106        let out_schema = Arc::new(Schema::new(vec![
1107            Field::new(
1108                "col1_unnest_placeholder_depth_1",
1109                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1110                true,
1111            ),
1112            Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1113            Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1114        ]));
1115        let batch = RecordBatch::try_new(
1116            Arc::clone(&schema),
1117            vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1118        )
1119        .unwrap();
1120        let list_type_columns = vec![
1121            ListUnnest {
1122                index_in_input_schema: 0,
1123                depth: 1,
1124            },
1125            ListUnnest {
1126                index_in_input_schema: 0,
1127                depth: 2,
1128            },
1129            ListUnnest {
1130                index_in_input_schema: 1,
1131                depth: 1,
1132            },
1133        ];
1134        let ret = build_batch(
1135            &batch,
1136            &out_schema,
1137            list_type_columns.as_ref(),
1138            &HashSet::default(),
1139            &UnnestOptions {
1140                preserve_nulls: true,
1141                recursions: vec![],
1142            },
1143        )?
1144        .unwrap();
1145
1146        assert_snapshot!(batches_to_string(&[ret]),
1147        @r###"
1148+---------------------------------+---------------------------------+---------------------------------+
1149| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |
1150+---------------------------------+---------------------------------+---------------------------------+
1151| [1, 2, 3]                       | 1                               | a                               |
1152|                                 | 2                               | b                               |
1153| [4, 5]                          | 3                               |                                 |
1154| [1, 2, 3]                       |                                 | a                               |
1155|                                 |                                 | b                               |
1156| [4, 5]                          |                                 |                                 |
1157| [1, 2, 3]                       | 4                               | a                               |
1158|                                 | 5                               | b                               |
1159| [4, 5]                          |                                 |                                 |
1160| [7, 8, 9, 10]                   | 7                               | c                               |
1161|                                 | 8                               | d                               |
1162| [11, 12, 13]                    | 9                               |                                 |
1163|                                 | 10                              |                                 |
1164| [7, 8, 9, 10]                   |                                 | c                               |
1165|                                 |                                 | d                               |
1166| [11, 12, 13]                    |                                 |                                 |
1167| [7, 8, 9, 10]                   | 11                              | c                               |
1168|                                 | 12                              | d                               |
1169| [11, 12, 13]                    | 13                              |                                 |
1170|                                 |                                 | e                               |
1171+---------------------------------+---------------------------------+---------------------------------+
1172        "###);
1173        Ok(())
1174    }
1175
1176    #[test]
1177    fn test_unnest_list_array() -> Result<()> {
1178        // [A, B, C], [], NULL, [D], NULL, [NULL, F]
1179        let list_array = make_generic_array::<i32>();
1180        verify_unnest_list_array(
1181            &list_array,
1182            vec![3, 2, 1, 2, 0, 3],
1183            vec![
1184                Some("A"),
1185                Some("B"),
1186                Some("C"),
1187                None,
1188                None,
1189                None,
1190                Some("D"),
1191                None,
1192                None,
1193                Some("F"),
1194                None,
1195            ],
1196        )?;
1197
1198        // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1199        let list_array = make_fixed_list();
1200        verify_unnest_list_array(
1201            &list_array,
1202            vec![3, 1, 2, 0, 2, 3],
1203            vec![
1204                Some("A"),
1205                Some("B"),
1206                None,
1207                None,
1208                Some("C"),
1209                Some("D"),
1210                None,
1211                Some("F"),
1212                None,
1213                None,
1214                None,
1215            ],
1216        )?;
1217
1218        Ok(())
1219    }
1220
1221    fn verify_longest_length(
1222        list_arrays: &[ArrayRef],
1223        preserve_nulls: bool,
1224        expected: Vec<i64>,
1225    ) -> Result<()> {
1226        let options = UnnestOptions {
1227            preserve_nulls,
1228            recursions: vec![],
1229        };
1230        let longest_length = find_longest_length(list_arrays, &options)?;
1231        let expected_array = Int64Array::from(expected);
1232        assert_eq!(
1233            longest_length
1234                .as_any()
1235                .downcast_ref::<Int64Array>()
1236                .unwrap(),
1237            &expected_array
1238        );
1239        Ok(())
1240    }
1241
1242    #[test]
1243    fn test_longest_list_length() -> Result<()> {
1244        // Test with single ListArray
1245        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1246        let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1247        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1248        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1249
1250        // Test with single LargeListArray
1251        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1252        let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1253        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1254        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1255
1256        // Test with single FixedSizeListArray
1257        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1258        let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1259        verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1260        verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1261
1262        // Test with multiple list arrays
1263        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1264        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1265        let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1266        let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1267        let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1268        verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1269        verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1270
1271        Ok(())
1272    }
1273
1274    #[test]
1275    fn test_create_take_indices() -> Result<()> {
1276        let length_array = Int64Array::from(vec![2, 3, 1]);
1277        let take_indices = create_take_indices(&length_array, 6);
1278        let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1279        assert_eq!(take_indices, expected);
1280        Ok(())
1281    }
1282}