datafusion_physical_plan/
unnest.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define a plan for unnesting values in columns that contain a list type.
19
20use std::cmp::{self, Ordering};
21use std::task::{ready, Poll};
22use std::{any::Any, sync::Arc};
23
24use super::metrics::{
25    self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
26    RecordOutput,
27};
28use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
29use crate::{
30    DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
31    SendableRecordBatchStream,
32};
33
34use arrow::array::{
35    new_null_array, Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray,
36    Int64Array, LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray,
37};
38use arrow::compute::kernels::length::length;
39use arrow::compute::kernels::zip::zip;
40use arrow::compute::{cast, is_not_null, kernels, sum};
41use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
42use arrow::record_batch::RecordBatch;
43use arrow_ord::cmp::lt;
44use async_trait::async_trait;
45use datafusion_common::{
46    exec_datafusion_err, exec_err, internal_err, Constraints, HashMap, HashSet, Result,
47    UnnestOptions,
48};
49use datafusion_execution::TaskContext;
50use datafusion_physical_expr::equivalence::ProjectionMapping;
51use datafusion_physical_expr::expressions::Column;
52use datafusion_physical_expr::PhysicalExpr;
53use futures::{Stream, StreamExt};
54use log::trace;
55
56/// Unnest the given columns (either with type struct or list)
57/// For list unnesting, each row is vertically transformed into multiple rows
58/// For struct unnesting, each column is horizontally transformed into multiple columns,
59/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
60///
61/// See [`UnnestOptions`] for more details and an example.
62#[derive(Debug, Clone)]
63pub struct UnnestExec {
64    /// Input execution plan
65    input: Arc<dyn ExecutionPlan>,
66    /// The schema once the unnest is applied
67    schema: SchemaRef,
68    /// Indices of the list-typed columns in the input schema
69    list_column_indices: Vec<ListUnnest>,
70    /// Indices of the struct-typed columns in the input schema
71    struct_column_indices: Vec<usize>,
72    /// Options
73    options: UnnestOptions,
74    /// Execution metrics
75    metrics: ExecutionPlanMetricsSet,
76    /// Cache holding plan properties like equivalences, output partitioning etc.
77    cache: PlanProperties,
78}
79
80impl UnnestExec {
81    /// Create a new [UnnestExec].
82    pub fn new(
83        input: Arc<dyn ExecutionPlan>,
84        list_column_indices: Vec<ListUnnest>,
85        struct_column_indices: Vec<usize>,
86        schema: SchemaRef,
87        options: UnnestOptions,
88    ) -> Result<Self> {
89        let cache = Self::compute_properties(
90            &input,
91            &list_column_indices,
92            &struct_column_indices,
93            Arc::clone(&schema),
94        )?;
95
96        Ok(UnnestExec {
97            input,
98            schema,
99            list_column_indices,
100            struct_column_indices,
101            options,
102            metrics: Default::default(),
103            cache,
104        })
105    }
106
107    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
108    fn compute_properties(
109        input: &Arc<dyn ExecutionPlan>,
110        list_column_indices: &[ListUnnest],
111        struct_column_indices: &[usize],
112        schema: SchemaRef,
113    ) -> Result<PlanProperties> {
114        // Find out which indices are not unnested, such that they can be copied over from the input plan
115        let input_schema = input.schema();
116        let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len());
117        unnested_indices.append_n(input_schema.fields().len(), false);
118        for list_unnest in list_column_indices {
119            unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
120        }
121        for struct_unnest in struct_column_indices {
122            unnested_indices.set_bit(*struct_unnest, true)
123        }
124        let unnested_indices = unnested_indices.finish();
125        let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
126            .filter(|idx| !unnested_indices.value(*idx))
127            .collect();
128
129        // Manually build projection mapping from non-unnested input columns to their positions in the output
130        let input_schema = input.schema();
131        let projection_mapping: ProjectionMapping = non_unnested_indices
132            .iter()
133            .map(|&input_idx| {
134                // Find what index the input column has in the output schema
135                let input_field = input_schema.field(input_idx);
136                let output_idx = schema
137                    .fields()
138                    .iter()
139                    .position(|output_field| output_field.name() == input_field.name())
140                    .ok_or_else(|| {
141                        exec_datafusion_err!(
142                            "Non-unnested column '{}' must exist in output schema",
143                            input_field.name()
144                        )
145                    })?;
146
147                let input_col = Arc::new(Column::new(input_field.name(), input_idx))
148                    as Arc<dyn PhysicalExpr>;
149                let target_col = Arc::new(Column::new(input_field.name(), output_idx))
150                    as Arc<dyn PhysicalExpr>;
151                // Use From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets
152                let targets = vec![(target_col, output_idx)].into();
153                Ok((input_col, targets))
154            })
155            .collect::<Result<ProjectionMapping>>()?;
156
157        // Create the unnest's equivalence properties by copying the input plan's equivalence properties
158        // for the unaffected columns. Except for the constraints, which are removed entirely because
159        // the unnest operation invalidates any global uniqueness or primary-key constraints.
160        let input_eq_properties = input.equivalence_properties();
161        let eq_properties = input_eq_properties
162            .project(&projection_mapping, Arc::clone(&schema))
163            .with_constraints(Constraints::default());
164
165        // Output partitioning must use the projection mapping
166        let output_partitioning = input
167            .output_partitioning()
168            .project(&projection_mapping, &eq_properties);
169
170        Ok(PlanProperties::new(
171            eq_properties,
172            output_partitioning,
173            input.pipeline_behavior(),
174            input.boundedness(),
175        ))
176    }
177
178    /// Input execution plan
179    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
180        &self.input
181    }
182
183    /// Indices of the list-typed columns in the input schema
184    pub fn list_column_indices(&self) -> &[ListUnnest] {
185        &self.list_column_indices
186    }
187
188    /// Indices of the struct-typed columns in the input schema
189    pub fn struct_column_indices(&self) -> &[usize] {
190        &self.struct_column_indices
191    }
192
193    pub fn options(&self) -> &UnnestOptions {
194        &self.options
195    }
196}
197
198impl DisplayAs for UnnestExec {
199    fn fmt_as(
200        &self,
201        t: DisplayFormatType,
202        f: &mut std::fmt::Formatter,
203    ) -> std::fmt::Result {
204        match t {
205            DisplayFormatType::Default | DisplayFormatType::Verbose => {
206                write!(f, "UnnestExec")
207            }
208            DisplayFormatType::TreeRender => {
209                write!(f, "")
210            }
211        }
212    }
213}
214
215impl ExecutionPlan for UnnestExec {
216    fn name(&self) -> &'static str {
217        "UnnestExec"
218    }
219
220    fn as_any(&self) -> &dyn Any {
221        self
222    }
223
224    fn properties(&self) -> &PlanProperties {
225        &self.cache
226    }
227
228    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
229        vec![&self.input]
230    }
231
232    fn with_new_children(
233        self: Arc<Self>,
234        children: Vec<Arc<dyn ExecutionPlan>>,
235    ) -> Result<Arc<dyn ExecutionPlan>> {
236        Ok(Arc::new(UnnestExec::new(
237            Arc::clone(&children[0]),
238            self.list_column_indices.clone(),
239            self.struct_column_indices.clone(),
240            Arc::clone(&self.schema),
241            self.options.clone(),
242        )?))
243    }
244
245    fn required_input_distribution(&self) -> Vec<Distribution> {
246        vec![Distribution::UnspecifiedDistribution]
247    }
248
249    fn execute(
250        &self,
251        partition: usize,
252        context: Arc<TaskContext>,
253    ) -> Result<SendableRecordBatchStream> {
254        let input = self.input.execute(partition, context)?;
255        let metrics = UnnestMetrics::new(partition, &self.metrics);
256
257        Ok(Box::pin(UnnestStream {
258            input,
259            schema: Arc::clone(&self.schema),
260            list_type_columns: self.list_column_indices.clone(),
261            struct_column_indices: self.struct_column_indices.iter().copied().collect(),
262            options: self.options.clone(),
263            metrics,
264        }))
265    }
266
267    fn metrics(&self) -> Option<MetricsSet> {
268        Some(self.metrics.clone_inner())
269    }
270}
271
272#[derive(Clone, Debug)]
273struct UnnestMetrics {
274    /// Execution metrics
275    baseline_metrics: BaselineMetrics,
276    /// Number of batches consumed
277    input_batches: metrics::Count,
278    /// Number of rows consumed
279    input_rows: metrics::Count,
280    /// Number of batches produced
281    output_batches: metrics::Count,
282}
283
284impl UnnestMetrics {
285    fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
286        let input_batches =
287            MetricBuilder::new(metrics).counter("input_batches", partition);
288
289        let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
290
291        let output_batches =
292            MetricBuilder::new(metrics).counter("output_batches", partition);
293
294        Self {
295            baseline_metrics: BaselineMetrics::new(metrics, partition),
296            input_batches,
297            input_rows,
298            output_batches,
299        }
300    }
301}
302
303/// A stream that issues [RecordBatch]es with unnested column data.
304struct UnnestStream {
305    /// Input stream
306    input: SendableRecordBatchStream,
307    /// Unnested schema
308    schema: Arc<Schema>,
309    /// represents all unnest operations to be applied to the input (input index, depth)
310    /// e.g unnest(col1),unnest(unnest(col1)) where col1 has index 1 in original input schema
311    /// then list_type_columns = [ListUnnest{1,1},ListUnnest{1,2}]
312    list_type_columns: Vec<ListUnnest>,
313    struct_column_indices: HashSet<usize>,
314    /// Options
315    options: UnnestOptions,
316    /// Metrics
317    metrics: UnnestMetrics,
318}
319
320impl RecordBatchStream for UnnestStream {
321    fn schema(&self) -> SchemaRef {
322        Arc::clone(&self.schema)
323    }
324}
325
326#[async_trait]
327impl Stream for UnnestStream {
328    type Item = Result<RecordBatch>;
329
330    fn poll_next(
331        mut self: std::pin::Pin<&mut Self>,
332        cx: &mut std::task::Context<'_>,
333    ) -> Poll<Option<Self::Item>> {
334        self.poll_next_impl(cx)
335    }
336}
337
338impl UnnestStream {
339    /// Separate implementation function that unpins the [`UnnestStream`] so
340    /// that partial borrows work correctly
341    fn poll_next_impl(
342        &mut self,
343        cx: &mut std::task::Context<'_>,
344    ) -> Poll<Option<Result<RecordBatch>>> {
345        loop {
346            return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
347                Some(Ok(batch)) => {
348                    let elapsed_compute =
349                        self.metrics.baseline_metrics.elapsed_compute().clone();
350                    let timer = elapsed_compute.timer();
351                    self.metrics.input_batches.add(1);
352                    self.metrics.input_rows.add(batch.num_rows());
353                    let result = build_batch(
354                        &batch,
355                        &self.schema,
356                        &self.list_type_columns,
357                        &self.struct_column_indices,
358                        &self.options,
359                    )?;
360                    timer.done();
361                    let Some(result_batch) = result else {
362                        continue;
363                    };
364                    self.metrics.output_batches.add(1);
365                    (&result_batch).record_output(&self.metrics.baseline_metrics);
366
367                    // Empty record batches should not be emitted.
368                    // They need to be treated as  [`Option<RecordBatch>`]es and handled separately
369                    debug_assert!(result_batch.num_rows() > 0);
370                    Some(Ok(result_batch))
371                }
372                other => {
373                    trace!(
374                        "Processed {} probe-side input batches containing {} rows and \
375                        produced {} output batches containing {} rows in {}",
376                        self.metrics.input_batches,
377                        self.metrics.input_rows,
378                        self.metrics.output_batches,
379                        self.metrics.baseline_metrics.output_rows(),
380                        self.metrics.baseline_metrics.elapsed_compute(),
381                    );
382                    other
383                }
384            });
385        }
386    }
387}
388
389/// Given a set of struct column indices to flatten
390/// try converting the column in input into multiple subfield columns
391/// For example
392/// struct_col: [a: struct(item: int, name: string), b: int]
393/// with a batch
394/// {a: {item: 1, name: "a"}, b: 2},
395/// {a: {item: 3, name: "b"}, b: 4]
396/// will be converted into
397/// {a.item: 1, a.name: "a", b: 2},
398/// {a.item: 3, a.name: "b", b: 4}
399fn flatten_struct_cols(
400    input_batch: &[Arc<dyn Array>],
401    schema: &SchemaRef,
402    struct_column_indices: &HashSet<usize>,
403) -> Result<RecordBatch> {
404    // horizontal expansion because of struct unnest
405    let columns_expanded = input_batch
406        .iter()
407        .enumerate()
408        .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
409            Some(_) => match column_data.data_type() {
410                DataType::Struct(_) => {
411                    let struct_arr =
412                        column_data.as_any().downcast_ref::<StructArray>().unwrap();
413                    Ok(struct_arr.columns().to_vec())
414                }
415                data_type => internal_err!(
416                    "expecting column {} from input plan to be a struct, got {:?}",
417                    idx,
418                    data_type
419                ),
420            },
421            None => Ok(vec![Arc::clone(column_data)]),
422        })
423        .collect::<Result<Vec<_>>>()?
424        .into_iter()
425        .flatten()
426        .collect();
427    Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
428}
429
430#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
431pub struct ListUnnest {
432    pub index_in_input_schema: usize,
433    pub depth: usize,
434}
435
436/// This function is used to execute the unnesting on multiple columns all at once, but
437/// one level at a time, and is called n times, where n is the highest recursion level among
438/// the unnest exprs in the query.
439///
440/// For example giving the following query:
441/// ```sql
442/// select unnest(colA, max_depth:=3) as P1, unnest(colA,max_depth:=2) as P2, unnest(colB, max_depth:=1) as P3 from temp;
443/// ```
444/// Then the total times this function being called is 3
445///
446/// It needs to be aware of which level the current unnesting is, because if there exists
447/// multiple unnesting on the same column, but with different recursion levels, say
448/// **unnest(colA, max_depth:=3)** and **unnest(colA, max_depth:=2)**, then the unnesting
449/// of expr **unnest(colA, max_depth:=3)** will start at level 3, while unnesting for expr
450/// **unnest(colA, max_depth:=2)** has to start at level 2
451///
452/// Set *colA* as a 3-dimension columns and *colB* as an array (1-dimension). As stated,
453/// this function is called with the descending order of recursion depth
454///
455/// Depth = 3
456/// - colA(3-dimension) unnest into temp column temp_P1(2_dimension) (unnesting of P1 starts
457///   from this level)
458/// - colA(3-dimension) having indices repeated by the unnesting operation above
459/// - colB(1-dimension) having indices repeated by the unnesting operation above
460///
461/// Depth = 2
462/// - temp_P1(2-dimension) unnest into temp column temp_P1(1-dimension)
463/// - colA(3-dimension) unnest into temp column temp_P2(2-dimension) (unnesting of P2 starts
464///   from this level)
465/// - colB(1-dimension) having indices repeated by the unnesting operation above
466///
467/// Depth = 1
468/// - temp_P1(1-dimension) unnest into P1
469/// - temp_P2(2-dimension) unnest into P2
470/// - colB(1-dimension) unnest into P3 (unnesting of P3 starts from this level)
471///
472/// The returned array will has the same size as the input batch
473/// and only contains original columns that are not being unnested.
474fn list_unnest_at_level(
475    batch: &[ArrayRef],
476    list_type_unnests: &[ListUnnest],
477    temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
478    level_to_unnest: usize,
479    options: &UnnestOptions,
480) -> Result<Option<Vec<ArrayRef>>> {
481    // Extract unnestable columns at this level
482    let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
483        list_type_unnests
484            .iter()
485            .filter_map(|unnesting| {
486                if level_to_unnest == unnesting.depth {
487                    return Some((
488                        Arc::clone(&batch[unnesting.index_in_input_schema]),
489                        *unnesting,
490                    ));
491                }
492                // This means the unnesting on this item has started at higher level
493                // and need to continue until depth reaches 1
494                if level_to_unnest < unnesting.depth {
495                    return Some((
496                        Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
497                        *unnesting,
498                    ));
499                }
500                None
501            })
502            .unzip();
503
504    // Filter out so that list_arrays only contain column with the highest depth
505    // at the same time, during iteration remove this depth so next time we don't have to unnest them again
506    let longest_length = find_longest_length(&arrs_to_unnest, options)?;
507    let unnested_length = longest_length.as_primitive::<Int64Type>();
508    let total_length = if unnested_length.is_empty() {
509        0
510    } else {
511        sum(unnested_length).ok_or_else(|| {
512            exec_datafusion_err!("Failed to calculate the total unnested length")
513        })? as usize
514    };
515    if total_length == 0 {
516        return Ok(None);
517    }
518
519    // Unnest all the list arrays
520    let unnested_temp_arrays =
521        unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
522
523    // Create the take indices array for other columns
524    let take_indices = create_take_indices(unnested_length, total_length);
525    unnested_temp_arrays
526        .into_iter()
527        .zip(list_unnest_specs.iter())
528        .for_each(|(flatten_arr, unnesting)| {
529            temp_unnested_arrs.insert(*unnesting, flatten_arr);
530        });
531
532    let repeat_mask: Vec<bool> = batch
533        .iter()
534        .enumerate()
535        .map(|(i, _)| {
536            // Check if the column is needed in future levels (levels below the current one)
537            let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
538                unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
539            });
540
541            // Check if the column is involved in unnesting at any level
542            let is_involved_in_unnesting = list_type_unnests
543                .iter()
544                .any(|unnesting| unnesting.index_in_input_schema == i);
545
546            // Repeat columns needed in future levels or not unnested.
547            needed_in_future_levels || !is_involved_in_unnesting
548        })
549        .collect();
550
551    // Dimension of arrays in batch is untouched, but the values are repeated
552    // as the side effect of unnesting
553    let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
554
555    Ok(Some(ret))
556}
557struct UnnestingResult {
558    arr: ArrayRef,
559    depth: usize,
560}
561
562/// For each row in a `RecordBatch`, some list/struct columns need to be unnested.
563/// - For list columns: We will expand the values in each list into multiple rows,
564///   taking the longest length among these lists, and shorter lists are padded with NULLs.
565/// - For struct columns: We will expand the struct columns into multiple subfield columns.
566///
567/// For columns that don't need to be unnested, repeat their values until reaching the longest length.
568///
569/// Note: unnest has a big difference in behavior between Postgres and DuckDB
570///
571/// Take this example
572///
573/// 1. Postgres
574/// ```ignored
575/// create table temp (
576///     i integer[][][], j integer[]
577/// )
578/// insert into temp values ('{{{1,2},{3,4}},{{5,6},{7,8}}}', '{1,2}');
579/// select unnest(i), unnest(j) from temp;
580/// ```
581///
582/// Result
583/// ```text
584///     1   1
585///     2   2
586///     3
587///     4
588///     5
589///     6
590///     7
591///     8
592/// ```
593/// 2. DuckDB
594/// ```ignore
595///     create table temp (i integer[][][], j integer[]);
596///     insert into temp values ([[[1,2],[3,4]],[[5,6],[7,8]]], [1,2]);
597///     select unnest(i,recursive:=true), unnest(j,recursive:=true) from temp;
598/// ```
599/// Result:
600/// ```text
601///
602///     ┌────────────────────────────────────────────────┬────────────────────────────────────────────────┐
603///     │ unnest(i, "recursive" := CAST('t' AS BOOLEAN)) │ unnest(j, "recursive" := CAST('t' AS BOOLEAN)) │
604///     │                     int32                      │                     int32                      │
605///     ├────────────────────────────────────────────────┼────────────────────────────────────────────────┤
606///     │                                              1 │                                              1 │
607///     │                                              2 │                                              2 │
608///     │                                              3 │                                              1 │
609///     │                                              4 │                                              2 │
610///     │                                              5 │                                              1 │
611///     │                                              6 │                                              2 │
612///     │                                              7 │                                              1 │
613///     │                                              8 │                                              2 │
614///     └────────────────────────────────────────────────┴────────────────────────────────────────────────┘
615/// ```
616///
617/// The following implementation refer to DuckDB's implementation
618fn build_batch(
619    batch: &RecordBatch,
620    schema: &SchemaRef,
621    list_type_columns: &[ListUnnest],
622    struct_column_indices: &HashSet<usize>,
623    options: &UnnestOptions,
624) -> Result<Option<RecordBatch>> {
625    let transformed = match list_type_columns.len() {
626        0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
627        _ => {
628            let mut temp_unnested_result = HashMap::new();
629            let max_recursion = list_type_columns
630                .iter()
631                .fold(0, |highest_depth, ListUnnest { depth, .. }| {
632                    cmp::max(highest_depth, *depth)
633                });
634
635            // This arr always has the same column count with the input batch
636            let mut flatten_arrs = vec![];
637
638            // Original batch has the same columns
639            // All unnesting results are written to temp_batch
640            for depth in (1..=max_recursion).rev() {
641                let input = match depth == max_recursion {
642                    true => batch.columns(),
643                    false => &flatten_arrs,
644                };
645                let Some(temp_result) = list_unnest_at_level(
646                    input,
647                    list_type_columns,
648                    &mut temp_unnested_result,
649                    depth,
650                    options,
651                )?
652                else {
653                    return Ok(None);
654                };
655                flatten_arrs = temp_result;
656            }
657            let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
658                temp_unnested_result.into_iter().fold(
659                    HashMap::new(),
660                    |mut acc,
661                     (
662                        ListUnnest {
663                            index_in_input_schema,
664                            depth,
665                        },
666                        flattened_array,
667                    )| {
668                        acc.entry(index_in_input_schema).or_default().push(
669                            UnnestingResult {
670                                arr: flattened_array,
671                                depth,
672                            },
673                        );
674                        acc
675                    },
676                );
677            let output_order: HashMap<ListUnnest, usize> = list_type_columns
678                .iter()
679                .enumerate()
680                .map(|(order, unnest_def)| (*unnest_def, order))
681                .collect();
682
683            // One original column may be unnested multiple times into separate columns
684            let mut multi_unnested_per_original_index = unnested_array_map
685                .into_iter()
686                .map(
687                    // Each item in unnested_columns is the result of unnesting the same input column
688                    // we need to sort them to conform with the original expression order
689                    // e.g unnest(unnest(col)) must goes before unnest(col)
690                    |(original_index, mut unnested_columns)| {
691                        unnested_columns.sort_by(
692                            |UnnestingResult { depth: depth1, .. },
693                             UnnestingResult { depth: depth2, .. }|
694                             -> Ordering {
695                                output_order
696                                    .get(&ListUnnest {
697                                        depth: *depth1,
698                                        index_in_input_schema: original_index,
699                                    })
700                                    .unwrap()
701                                    .cmp(
702                                        output_order
703                                            .get(&ListUnnest {
704                                                depth: *depth2,
705                                                index_in_input_schema: original_index,
706                                            })
707                                            .unwrap(),
708                                    )
709                            },
710                        );
711                        (
712                            original_index,
713                            unnested_columns
714                                .into_iter()
715                                .map(|result| result.arr)
716                                .collect::<Vec<_>>(),
717                        )
718                    },
719                )
720                .collect::<HashMap<_, _>>();
721
722            let ret = flatten_arrs
723                .into_iter()
724                .enumerate()
725                .flat_map(|(col_idx, arr)| {
726                    // Convert original column into its unnested version(s)
727                    // Plural because one column can be unnested with different recursion level
728                    // and into separate output columns
729                    match multi_unnested_per_original_index.remove(&col_idx) {
730                        Some(unnested_arrays) => unnested_arrays,
731                        None => vec![arr],
732                    }
733                })
734                .collect::<Vec<_>>();
735
736            flatten_struct_cols(&ret, schema, struct_column_indices)
737        }
738    }?;
739    Ok(Some(transformed))
740}
741
742/// Find the longest list length among the given list arrays for each row.
743///
744/// For example if we have the following two list arrays:
745///
746/// ```ignore
747/// l1: [1, 2, 3], null, [], [3]
748/// l2: [4,5], [], null, [6, 7]
749/// ```
750///
751/// If `preserve_nulls` is false, the longest length array will be:
752///
753/// ```ignore
754/// longest_length: [3, 0, 0, 2]
755/// ```
756///
757/// whereas if `preserve_nulls` is true, the longest length array will be:
758///
759///
760/// ```ignore
761/// longest_length: [3, 1, 1, 2]
762/// ```
763fn find_longest_length(
764    list_arrays: &[ArrayRef],
765    options: &UnnestOptions,
766) -> Result<ArrayRef> {
767    // The length of a NULL list
768    let null_length = if options.preserve_nulls {
769        Scalar::new(Int64Array::from_value(1, 1))
770    } else {
771        Scalar::new(Int64Array::from_value(0, 1))
772    };
773    let list_lengths: Vec<ArrayRef> = list_arrays
774        .iter()
775        .map(|list_array| {
776            let mut length_array = length(list_array)?;
777            // Make sure length arrays have the same type. Int64 is the most general one.
778            length_array = cast(&length_array, &DataType::Int64)?;
779            length_array =
780                zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
781            Ok(length_array)
782        })
783        .collect::<Result<_>>()?;
784
785    let longest_length = list_lengths.iter().skip(1).try_fold(
786        Arc::clone(&list_lengths[0]),
787        |longest, current| {
788            let is_lt = lt(&longest, &current)?;
789            zip(&is_lt, &current, &longest)
790        },
791    )?;
792    Ok(longest_length)
793}
794
795/// Trait defining common methods used for unnesting, implemented by list array types.
796trait ListArrayType: Array {
797    /// Returns a reference to the values of this list.
798    fn values(&self) -> &ArrayRef;
799
800    /// Returns the start and end offset of the values for the given row.
801    fn value_offsets(&self, row: usize) -> (i64, i64);
802}
803
804impl ListArrayType for ListArray {
805    fn values(&self) -> &ArrayRef {
806        self.values()
807    }
808
809    fn value_offsets(&self, row: usize) -> (i64, i64) {
810        let offsets = self.value_offsets();
811        (offsets[row].into(), offsets[row + 1].into())
812    }
813}
814
815impl ListArrayType for LargeListArray {
816    fn values(&self) -> &ArrayRef {
817        self.values()
818    }
819
820    fn value_offsets(&self, row: usize) -> (i64, i64) {
821        let offsets = self.value_offsets();
822        (offsets[row], offsets[row + 1])
823    }
824}
825
826impl ListArrayType for FixedSizeListArray {
827    fn values(&self) -> &ArrayRef {
828        self.values()
829    }
830
831    fn value_offsets(&self, row: usize) -> (i64, i64) {
832        let start = self.value_offset(row) as i64;
833        (start, start + self.value_length() as i64)
834    }
835}
836
837/// Unnest multiple list arrays according to the length array.
838fn unnest_list_arrays(
839    list_arrays: &[ArrayRef],
840    length_array: &PrimitiveArray<Int64Type>,
841    capacity: usize,
842) -> Result<Vec<ArrayRef>> {
843    let typed_arrays = list_arrays
844        .iter()
845        .map(|list_array| match list_array.data_type() {
846            DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
847            DataType::LargeList(_) => {
848                Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
849            }
850            DataType::FixedSizeList(_, _) => {
851                Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
852            }
853            other => exec_err!("Invalid unnest datatype {other }"),
854        })
855        .collect::<Result<Vec<_>>>()?;
856
857    typed_arrays
858        .iter()
859        .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
860        .collect::<Result<_>>()
861}
862
863/// Unnest a list array according the target length array.
864///
865/// Consider a list array like this:
866///
867/// ```ignore
868/// [1], [2, 3, 4], null, [5], [],
869/// ```
870///
871/// and the length array is:
872///
873/// ```ignore
874/// [2, 3, 2, 1, 2]
875/// ```
876///
877/// If the length of a certain list is less than the target length, pad with NULLs.
878/// So the unnested array will look like this:
879///
880/// ```ignore
881/// [1, null, 2, 3, 4, null, null, 5, null, null]
882/// ```
883fn unnest_list_array(
884    list_array: &dyn ListArrayType,
885    length_array: &PrimitiveArray<Int64Type>,
886    capacity: usize,
887) -> Result<ArrayRef> {
888    let values = list_array.values();
889    let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
890    for row in 0..list_array.len() {
891        let mut value_length = 0;
892        if !list_array.is_null(row) {
893            let (start, end) = list_array.value_offsets(row);
894            value_length = end - start;
895            for i in start..end {
896                take_indices_builder.append_value(i)
897            }
898        }
899        let target_length = length_array.value(row);
900        debug_assert!(
901            value_length <= target_length,
902            "value length is beyond the longest length"
903        );
904        // Pad with NULL values
905        for _ in value_length..target_length {
906            take_indices_builder.append_null();
907        }
908    }
909    Ok(kernels::take::take(
910        &values,
911        &take_indices_builder.finish(),
912        None,
913    )?)
914}
915
916/// Creates take indices that will be used to expand all columns except for the list type
917/// [`columns`](UnnestExec::list_column_indices) that is being unnested.
918/// Every column value needs to be repeated multiple times according to the length array.
919///
920/// If the length array looks like this:
921///
922/// ```ignore
923/// [2, 3, 1]
924/// ```
925/// Then [`create_take_indices`] will return an array like this
926///
927/// ```ignore
928/// [0, 0, 1, 1, 1, 2]
929/// ```
930fn create_take_indices(
931    length_array: &PrimitiveArray<Int64Type>,
932    capacity: usize,
933) -> PrimitiveArray<Int64Type> {
934    // `find_longest_length()` guarantees this.
935    debug_assert!(
936        length_array.null_count() == 0,
937        "length array should not contain nulls"
938    );
939    let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
940    for (index, repeat) in length_array.iter().enumerate() {
941        // The length array should not contain nulls, so unwrap is safe
942        let repeat = repeat.unwrap();
943        (0..repeat).for_each(|_| builder.append_value(index as i64));
944    }
945    builder.finish()
946}
947
948/// Create a batch of arrays based on an input `batch` and a `indices` array.
949/// The `indices` array is used by the take kernel to repeat values in the arrays
950/// that are marked with `true` in the `repeat_mask`. Arrays marked with `false`
951/// in the `repeat_mask` will be replaced with arrays filled with nulls of the
952/// appropriate length.
953///
954/// For example if we have the following batch:
955///
956/// ```ignore
957/// c1: [1], null, [2, 3, 4], null, [5, 6]
958/// c2: 'a', 'b',  'c', null, 'd'
959/// ```
960///
961/// then the `unnested_list_arrays` contains the unnest column that will replace `c1` in
962/// the final batch if `preserve_nulls` is true:
963///
964/// ```ignore
965/// c1: 1, null, 2, 3, 4, null, 5, 6
966/// ```
967///
968/// And the `indices` array contains the indices that are used by `take` kernel to
969/// repeat the values in `c2`:
970///
971/// ```ignore
972/// 0, 1, 2, 2, 2, 3, 4, 4
973/// ```
974///
975/// so that the final batch will look like:
976///
977/// ```ignore
978/// c1: 1, null, 2, 3, 4, null, 5, 6
979/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
980/// ```
981///
982/// The `repeat_mask` determines whether an array's values are repeated or replaced with nulls.
983/// For example, if the `repeat_mask` is:
984///
985/// ```ignore
986/// [true, false]
987/// ```
988///
989/// The final batch will look like:
990///
991/// ```ignore
992/// c1: 1, null, 2, 3, 4, null, 5, 6  // Repeated using `indices`
993/// c2: null, null, null, null, null, null, null, null  // Replaced with nulls
994fn repeat_arrs_from_indices(
995    batch: &[ArrayRef],
996    indices: &PrimitiveArray<Int64Type>,
997    repeat_mask: &[bool],
998) -> Result<Vec<Arc<dyn Array>>> {
999    batch
1000        .iter()
1001        .zip(repeat_mask.iter())
1002        .map(|(arr, &repeat)| {
1003            if repeat {
1004                Ok(kernels::take::take(arr, indices, None)?)
1005            } else {
1006                Ok(new_null_array(arr.data_type(), arr.len()))
1007            }
1008        })
1009        .collect()
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014    use super::*;
1015    use arrow::array::{
1016        GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
1017    };
1018    use arrow::buffer::{NullBuffer, OffsetBuffer};
1019    use arrow::datatypes::{Field, Int32Type};
1020    use datafusion_common::test_util::batches_to_string;
1021    use insta::assert_snapshot;
1022
1023    // Create a GenericListArray with the following list values:
1024    //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1025    fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
1026    where
1027        OffsetSize: OffsetSizeTrait,
1028    {
1029        let mut values = vec![];
1030        let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
1031        let mut valid = NullBufferBuilder::new(6);
1032
1033        // [A, B, C]
1034        values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
1035        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1036        valid.append_non_null();
1037
1038        // []
1039        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1040        valid.append_non_null();
1041
1042        // NULL with non-zero value length
1043        // Issue https://github.com/apache/datafusion/issues/9932
1044        values.push(Some("?"));
1045        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1046        valid.append_null();
1047
1048        // [D]
1049        values.push(Some("D"));
1050        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1051        valid.append_non_null();
1052
1053        // Another NULL with zero value length
1054        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1055        valid.append_null();
1056
1057        // [NULL, F]
1058        values.extend_from_slice(&[None, Some("F")]);
1059        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1060        valid.append_non_null();
1061
1062        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1063        GenericListArray::<OffsetSize>::new(
1064            field,
1065            OffsetBuffer::new(offsets.into()),
1066            Arc::new(StringArray::from(values)),
1067            valid.finish(),
1068        )
1069    }
1070
1071    // Create a FixedSizeListArray with the following list values:
1072    //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1073    fn make_fixed_list() -> FixedSizeListArray {
1074        let values = Arc::new(StringArray::from_iter([
1075            Some("A"),
1076            Some("B"),
1077            None,
1078            None,
1079            Some("C"),
1080            Some("D"),
1081            None,
1082            None,
1083            None,
1084            Some("F"),
1085            None,
1086            None,
1087        ]));
1088        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1089        let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1090        FixedSizeListArray::new(field, 2, values, Some(valid))
1091    }
1092
1093    fn verify_unnest_list_array(
1094        list_array: &dyn ListArrayType,
1095        lengths: Vec<i64>,
1096        expected: Vec<Option<&str>>,
1097    ) -> Result<()> {
1098        let length_array = Int64Array::from(lengths);
1099        let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1100        let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1101        assert_eq!(strs, expected);
1102        Ok(())
1103    }
1104
1105    #[test]
1106    fn test_build_batch_list_arr_recursive() -> Result<()> {
1107        // col1                             | col2
1108        // [[1,2,3],null,[4,5]]             | ['a','b']
1109        // [[7,8,9,10], null, [11,12,13]]   | ['c','d']
1110        // null                             | ['e']
1111        let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1112            Some(vec![Some(1), Some(2), Some(3)]),
1113            None,
1114            Some(vec![Some(4), Some(5)]),
1115            Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1116            None,
1117            Some(vec![Some(11), Some(12), Some(13)]),
1118        ]);
1119
1120        let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1121        let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1122        let mut nulls = NullBufferBuilder::new(3);
1123        nulls.append_non_null();
1124        nulls.append_non_null();
1125        nulls.append_null();
1126        // list<list<int32>>
1127        let col1_field = Field::new_list_field(
1128            DataType::List(Arc::new(Field::new_list_field(
1129                list_arr1_ref.data_type().to_owned(),
1130                true,
1131            ))),
1132            true,
1133        );
1134        let col1 = ListArray::new(
1135            Arc::new(Field::new_list_field(
1136                list_arr1_ref.data_type().to_owned(),
1137                true,
1138            )),
1139            offsets,
1140            list_arr1_ref,
1141            nulls.finish(),
1142        );
1143
1144        let list_arr2 = StringArray::from(vec![
1145            Some("a"),
1146            Some("b"),
1147            Some("c"),
1148            Some("d"),
1149            Some("e"),
1150        ]);
1151
1152        let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1153        let mut nulls = NullBufferBuilder::new(3);
1154        nulls.append_n_non_nulls(3);
1155        let col2_field = Field::new(
1156            "col2",
1157            DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1158            true,
1159        );
1160        let col2 = GenericListArray::<i32>::new(
1161            Arc::new(Field::new_list_field(DataType::Utf8, true)),
1162            OffsetBuffer::new(offsets.into()),
1163            Arc::new(list_arr2),
1164            nulls.finish(),
1165        );
1166        // convert col1 and col2 to a record batch
1167        let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1168        let out_schema = Arc::new(Schema::new(vec![
1169            Field::new(
1170                "col1_unnest_placeholder_depth_1",
1171                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1172                true,
1173            ),
1174            Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1175            Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1176        ]));
1177        let batch = RecordBatch::try_new(
1178            Arc::clone(&schema),
1179            vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1180        )
1181        .unwrap();
1182        let list_type_columns = vec![
1183            ListUnnest {
1184                index_in_input_schema: 0,
1185                depth: 1,
1186            },
1187            ListUnnest {
1188                index_in_input_schema: 0,
1189                depth: 2,
1190            },
1191            ListUnnest {
1192                index_in_input_schema: 1,
1193                depth: 1,
1194            },
1195        ];
1196        let ret = build_batch(
1197            &batch,
1198            &out_schema,
1199            list_type_columns.as_ref(),
1200            &HashSet::default(),
1201            &UnnestOptions {
1202                preserve_nulls: true,
1203                recursions: vec![],
1204            },
1205        )?
1206        .unwrap();
1207
1208        assert_snapshot!(batches_to_string(&[ret]),
1209        @r###"
1210+---------------------------------+---------------------------------+---------------------------------+
1211| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |
1212+---------------------------------+---------------------------------+---------------------------------+
1213| [1, 2, 3]                       | 1                               | a                               |
1214|                                 | 2                               | b                               |
1215| [4, 5]                          | 3                               |                                 |
1216| [1, 2, 3]                       |                                 | a                               |
1217|                                 |                                 | b                               |
1218| [4, 5]                          |                                 |                                 |
1219| [1, 2, 3]                       | 4                               | a                               |
1220|                                 | 5                               | b                               |
1221| [4, 5]                          |                                 |                                 |
1222| [7, 8, 9, 10]                   | 7                               | c                               |
1223|                                 | 8                               | d                               |
1224| [11, 12, 13]                    | 9                               |                                 |
1225|                                 | 10                              |                                 |
1226| [7, 8, 9, 10]                   |                                 | c                               |
1227|                                 |                                 | d                               |
1228| [11, 12, 13]                    |                                 |                                 |
1229| [7, 8, 9, 10]                   | 11                              | c                               |
1230|                                 | 12                              | d                               |
1231| [11, 12, 13]                    | 13                              |                                 |
1232|                                 |                                 | e                               |
1233+---------------------------------+---------------------------------+---------------------------------+
1234        "###);
1235        Ok(())
1236    }
1237
1238    #[test]
1239    fn test_unnest_list_array() -> Result<()> {
1240        // [A, B, C], [], NULL, [D], NULL, [NULL, F]
1241        let list_array = make_generic_array::<i32>();
1242        verify_unnest_list_array(
1243            &list_array,
1244            vec![3, 2, 1, 2, 0, 3],
1245            vec![
1246                Some("A"),
1247                Some("B"),
1248                Some("C"),
1249                None,
1250                None,
1251                None,
1252                Some("D"),
1253                None,
1254                None,
1255                Some("F"),
1256                None,
1257            ],
1258        )?;
1259
1260        // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1261        let list_array = make_fixed_list();
1262        verify_unnest_list_array(
1263            &list_array,
1264            vec![3, 1, 2, 0, 2, 3],
1265            vec![
1266                Some("A"),
1267                Some("B"),
1268                None,
1269                None,
1270                Some("C"),
1271                Some("D"),
1272                None,
1273                Some("F"),
1274                None,
1275                None,
1276                None,
1277            ],
1278        )?;
1279
1280        Ok(())
1281    }
1282
1283    fn verify_longest_length(
1284        list_arrays: &[ArrayRef],
1285        preserve_nulls: bool,
1286        expected: Vec<i64>,
1287    ) -> Result<()> {
1288        let options = UnnestOptions {
1289            preserve_nulls,
1290            recursions: vec![],
1291        };
1292        let longest_length = find_longest_length(list_arrays, &options)?;
1293        let expected_array = Int64Array::from(expected);
1294        assert_eq!(
1295            longest_length
1296                .as_any()
1297                .downcast_ref::<Int64Array>()
1298                .unwrap(),
1299            &expected_array
1300        );
1301        Ok(())
1302    }
1303
1304    #[test]
1305    fn test_longest_list_length() -> Result<()> {
1306        // Test with single ListArray
1307        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1308        let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1309        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1310        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1311
1312        // Test with single LargeListArray
1313        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1314        let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1315        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1316        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1317
1318        // Test with single FixedSizeListArray
1319        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1320        let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1321        verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1322        verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1323
1324        // Test with multiple list arrays
1325        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1326        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1327        let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1328        let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1329        let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1330        verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1331        verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1332
1333        Ok(())
1334    }
1335
1336    #[test]
1337    fn test_create_take_indices() -> Result<()> {
1338        let length_array = Int64Array::from(vec![2, 3, 1]);
1339        let take_indices = create_take_indices(&length_array, 6);
1340        let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1341        assert_eq!(take_indices, expected);
1342        Ok(())
1343    }
1344}