Skip to main content

datafusion_physical_plan/
unnest.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define a plan for unnesting values in columns that contain a list type.
19
20use std::cmp::{self, Ordering};
21use std::task::{Poll, ready};
22use std::{any::Any, sync::Arc};
23
24use super::metrics::{
25    self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
26    RecordOutput,
27};
28use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
29use crate::{
30    DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
31    SendableRecordBatchStream, check_if_same_properties,
32};
33
34use arrow::array::{
35    Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray, Int64Array,
36    LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray, new_null_array,
37};
38use arrow::compute::kernels::length::length;
39use arrow::compute::kernels::zip::zip;
40use arrow::compute::{cast, is_not_null, kernels, sum};
41use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
42use arrow::record_batch::RecordBatch;
43use arrow_ord::cmp::lt;
44use async_trait::async_trait;
45use datafusion_common::{
46    Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err,
47    internal_err,
48};
49use datafusion_execution::TaskContext;
50use datafusion_physical_expr::PhysicalExpr;
51use datafusion_physical_expr::equivalence::ProjectionMapping;
52use datafusion_physical_expr::expressions::Column;
53use futures::{Stream, StreamExt};
54use log::trace;
55
56/// Unnest the given columns (either with type struct or list)
57/// For list unnesting, each row is vertically transformed into multiple rows
58/// For struct unnesting, each column is horizontally transformed into multiple columns,
59/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
60///
61/// See [`UnnestOptions`] for more details and an example.
62#[derive(Debug, Clone)]
63pub struct UnnestExec {
64    /// Input execution plan
65    input: Arc<dyn ExecutionPlan>,
66    /// The schema once the unnest is applied
67    schema: SchemaRef,
68    /// Indices of the list-typed columns in the input schema
69    list_column_indices: Vec<ListUnnest>,
70    /// Indices of the struct-typed columns in the input schema
71    struct_column_indices: Vec<usize>,
72    /// Options
73    options: UnnestOptions,
74    /// Execution metrics
75    metrics: ExecutionPlanMetricsSet,
76    /// Cache holding plan properties like equivalences, output partitioning etc.
77    cache: Arc<PlanProperties>,
78}
79
80impl UnnestExec {
81    /// Create a new [UnnestExec].
82    pub fn new(
83        input: Arc<dyn ExecutionPlan>,
84        list_column_indices: Vec<ListUnnest>,
85        struct_column_indices: Vec<usize>,
86        schema: SchemaRef,
87        options: UnnestOptions,
88    ) -> Result<Self> {
89        let cache = Self::compute_properties(
90            &input,
91            &list_column_indices,
92            &struct_column_indices,
93            &schema,
94        )?;
95
96        Ok(UnnestExec {
97            input,
98            schema,
99            list_column_indices,
100            struct_column_indices,
101            options,
102            metrics: Default::default(),
103            cache: Arc::new(cache),
104        })
105    }
106
107    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
108    fn compute_properties(
109        input: &Arc<dyn ExecutionPlan>,
110        list_column_indices: &[ListUnnest],
111        struct_column_indices: &[usize],
112        schema: &SchemaRef,
113    ) -> Result<PlanProperties> {
114        // Find out which indices are not unnested, such that they can be copied over from the input plan
115        let input_schema = input.schema();
116        let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len());
117        unnested_indices.append_n(input_schema.fields().len(), false);
118        for list_unnest in list_column_indices {
119            unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
120        }
121        for struct_unnest in struct_column_indices {
122            unnested_indices.set_bit(*struct_unnest, true)
123        }
124        let unnested_indices = unnested_indices.finish();
125        let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
126            .filter(|idx| !unnested_indices.value(*idx))
127            .collect();
128
129        // Manually build projection mapping from non-unnested input columns to their positions in the output
130        let input_schema = input.schema();
131        let projection_mapping: ProjectionMapping = non_unnested_indices
132            .iter()
133            .map(|&input_idx| {
134                // Find what index the input column has in the output schema
135                let input_field = input_schema.field(input_idx);
136                let output_idx = schema
137                    .fields()
138                    .iter()
139                    .position(|output_field| output_field.name() == input_field.name())
140                    .ok_or_else(|| {
141                        exec_datafusion_err!(
142                            "Non-unnested column '{}' must exist in output schema",
143                            input_field.name()
144                        )
145                    })?;
146
147                let input_col = Arc::new(Column::new(input_field.name(), input_idx))
148                    as Arc<dyn PhysicalExpr>;
149                let target_col = Arc::new(Column::new(input_field.name(), output_idx))
150                    as Arc<dyn PhysicalExpr>;
151                // Use From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets
152                let targets = vec![(target_col, output_idx)].into();
153                Ok((input_col, targets))
154            })
155            .collect::<Result<ProjectionMapping>>()?;
156
157        // Create the unnest's equivalence properties by copying the input plan's equivalence properties
158        // for the unaffected columns. Except for the constraints, which are removed entirely because
159        // the unnest operation invalidates any global uniqueness or primary-key constraints.
160        let input_eq_properties = input.equivalence_properties();
161        let eq_properties = input_eq_properties
162            .project(&projection_mapping, Arc::clone(schema))
163            .with_constraints(Constraints::default());
164
165        // Output partitioning must use the projection mapping
166        let output_partitioning = input
167            .output_partitioning()
168            .project(&projection_mapping, &eq_properties);
169
170        Ok(PlanProperties::new(
171            eq_properties,
172            output_partitioning,
173            input.pipeline_behavior(),
174            input.boundedness(),
175        ))
176    }
177
178    /// Input execution plan
179    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
180        &self.input
181    }
182
183    /// Indices of the list-typed columns in the input schema
184    pub fn list_column_indices(&self) -> &[ListUnnest] {
185        &self.list_column_indices
186    }
187
188    /// Indices of the struct-typed columns in the input schema
189    pub fn struct_column_indices(&self) -> &[usize] {
190        &self.struct_column_indices
191    }
192
193    pub fn options(&self) -> &UnnestOptions {
194        &self.options
195    }
196
197    fn with_new_children_and_same_properties(
198        &self,
199        mut children: Vec<Arc<dyn ExecutionPlan>>,
200    ) -> Self {
201        Self {
202            input: children.swap_remove(0),
203            metrics: ExecutionPlanMetricsSet::new(),
204            ..Self::clone(self)
205        }
206    }
207}
208
209impl DisplayAs for UnnestExec {
210    fn fmt_as(
211        &self,
212        t: DisplayFormatType,
213        f: &mut std::fmt::Formatter,
214    ) -> std::fmt::Result {
215        match t {
216            DisplayFormatType::Default | DisplayFormatType::Verbose => {
217                write!(f, "UnnestExec")
218            }
219            DisplayFormatType::TreeRender => {
220                write!(f, "")
221            }
222        }
223    }
224}
225
226impl ExecutionPlan for UnnestExec {
227    fn name(&self) -> &'static str {
228        "UnnestExec"
229    }
230
231    fn as_any(&self) -> &dyn Any {
232        self
233    }
234
235    fn properties(&self) -> &Arc<PlanProperties> {
236        &self.cache
237    }
238
239    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
240        vec![&self.input]
241    }
242
243    fn with_new_children(
244        self: Arc<Self>,
245        mut children: Vec<Arc<dyn ExecutionPlan>>,
246    ) -> Result<Arc<dyn ExecutionPlan>> {
247        check_if_same_properties!(self, children);
248        Ok(Arc::new(UnnestExec::new(
249            children.swap_remove(0),
250            self.list_column_indices.clone(),
251            self.struct_column_indices.clone(),
252            Arc::clone(&self.schema),
253            self.options.clone(),
254        )?))
255    }
256
257    fn required_input_distribution(&self) -> Vec<Distribution> {
258        vec![Distribution::UnspecifiedDistribution]
259    }
260
261    fn execute(
262        &self,
263        partition: usize,
264        context: Arc<TaskContext>,
265    ) -> Result<SendableRecordBatchStream> {
266        let input = self.input.execute(partition, context)?;
267        let metrics = UnnestMetrics::new(partition, &self.metrics);
268
269        Ok(Box::pin(UnnestStream {
270            input,
271            schema: Arc::clone(&self.schema),
272            list_type_columns: self.list_column_indices.clone(),
273            struct_column_indices: self.struct_column_indices.iter().copied().collect(),
274            options: self.options.clone(),
275            metrics,
276        }))
277    }
278
279    fn metrics(&self) -> Option<MetricsSet> {
280        Some(self.metrics.clone_inner())
281    }
282}
283
284#[derive(Clone, Debug)]
285struct UnnestMetrics {
286    /// Execution metrics
287    baseline_metrics: BaselineMetrics,
288    /// Number of batches consumed
289    input_batches: metrics::Count,
290    /// Number of rows consumed
291    input_rows: metrics::Count,
292}
293
294impl UnnestMetrics {
295    fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
296        let input_batches =
297            MetricBuilder::new(metrics).counter("input_batches", partition);
298
299        let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
300
301        Self {
302            baseline_metrics: BaselineMetrics::new(metrics, partition),
303            input_batches,
304            input_rows,
305        }
306    }
307}
308
309/// A stream that issues [RecordBatch]es with unnested column data.
310struct UnnestStream {
311    /// Input stream
312    input: SendableRecordBatchStream,
313    /// Unnested schema
314    schema: Arc<Schema>,
315    /// represents all unnest operations to be applied to the input (input index, depth)
316    /// e.g unnest(col1),unnest(unnest(col1)) where col1 has index 1 in original input schema
317    /// then list_type_columns = [ListUnnest{1,1},ListUnnest{1,2}]
318    list_type_columns: Vec<ListUnnest>,
319    struct_column_indices: HashSet<usize>,
320    /// Options
321    options: UnnestOptions,
322    /// Metrics
323    metrics: UnnestMetrics,
324}
325
326impl RecordBatchStream for UnnestStream {
327    fn schema(&self) -> SchemaRef {
328        Arc::clone(&self.schema)
329    }
330}
331
332#[async_trait]
333impl Stream for UnnestStream {
334    type Item = Result<RecordBatch>;
335
336    fn poll_next(
337        mut self: std::pin::Pin<&mut Self>,
338        cx: &mut std::task::Context<'_>,
339    ) -> Poll<Option<Self::Item>> {
340        self.poll_next_impl(cx)
341    }
342}
343
344impl UnnestStream {
345    /// Separate implementation function that unpins the [`UnnestStream`] so
346    /// that partial borrows work correctly
347    fn poll_next_impl(
348        &mut self,
349        cx: &mut std::task::Context<'_>,
350    ) -> Poll<Option<Result<RecordBatch>>> {
351        loop {
352            return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
353                Some(Ok(batch)) => {
354                    let elapsed_compute =
355                        self.metrics.baseline_metrics.elapsed_compute().clone();
356                    let timer = elapsed_compute.timer();
357                    self.metrics.input_batches.add(1);
358                    self.metrics.input_rows.add(batch.num_rows());
359                    let result = build_batch(
360                        &batch,
361                        &self.schema,
362                        &self.list_type_columns,
363                        &self.struct_column_indices,
364                        &self.options,
365                    )?;
366                    timer.done();
367                    let Some(result_batch) = result else {
368                        continue;
369                    };
370                    (&result_batch).record_output(&self.metrics.baseline_metrics);
371
372                    // Empty record batches should not be emitted.
373                    // They need to be treated as  [`Option<RecordBatch>`]es and handled separately
374                    debug_assert!(result_batch.num_rows() > 0);
375                    Some(Ok(result_batch))
376                }
377                other => {
378                    trace!(
379                        "Processed {} probe-side input batches containing {} rows and \
380                        produced {} output batches containing {} rows in {}",
381                        self.metrics.input_batches,
382                        self.metrics.input_rows,
383                        self.metrics.baseline_metrics.output_batches(),
384                        self.metrics.baseline_metrics.output_rows(),
385                        self.metrics.baseline_metrics.elapsed_compute(),
386                    );
387                    other
388                }
389            });
390        }
391    }
392}
393
394/// Given a set of struct column indices to flatten
395/// try converting the column in input into multiple subfield columns
396/// For example
397/// struct_col: [a: struct(item: int, name: string), b: int]
398/// with a batch
399/// {a: {item: 1, name: "a"}, b: 2},
400/// {a: {item: 3, name: "b"}, b: 4]
401/// will be converted into
402/// {a.item: 1, a.name: "a", b: 2},
403/// {a.item: 3, a.name: "b", b: 4}
404fn flatten_struct_cols(
405    input_batch: &[Arc<dyn Array>],
406    schema: &SchemaRef,
407    struct_column_indices: &HashSet<usize>,
408) -> Result<RecordBatch> {
409    // horizontal expansion because of struct unnest
410    let columns_expanded = input_batch
411        .iter()
412        .enumerate()
413        .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
414            Some(_) => match column_data.data_type() {
415                DataType::Struct(_) => {
416                    let struct_arr =
417                        column_data.as_any().downcast_ref::<StructArray>().unwrap();
418                    Ok(struct_arr.columns().to_vec())
419                }
420                data_type => internal_err!(
421                    "expecting column {} from input plan to be a struct, got {:?}",
422                    idx,
423                    data_type
424                ),
425            },
426            None => Ok(vec![Arc::clone(column_data)]),
427        })
428        .collect::<Result<Vec<_>>>()?
429        .into_iter()
430        .flatten()
431        .collect();
432    Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
433}
434
435#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
436pub struct ListUnnest {
437    pub index_in_input_schema: usize,
438    pub depth: usize,
439}
440
441/// This function is used to execute the unnesting on multiple columns all at once, but
442/// one level at a time, and is called n times, where n is the highest recursion level among
443/// the unnest exprs in the query.
444///
445/// For example giving the following query:
446/// ```sql
447/// select unnest(colA, max_depth:=3) as P1, unnest(colA,max_depth:=2) as P2, unnest(colB, max_depth:=1) as P3 from temp;
448/// ```
449/// Then the total times this function being called is 3
450///
451/// It needs to be aware of which level the current unnesting is, because if there exists
452/// multiple unnesting on the same column, but with different recursion levels, say
453/// **unnest(colA, max_depth:=3)** and **unnest(colA, max_depth:=2)**, then the unnesting
454/// of expr **unnest(colA, max_depth:=3)** will start at level 3, while unnesting for expr
455/// **unnest(colA, max_depth:=2)** has to start at level 2
456///
457/// Set *colA* as a 3-dimension columns and *colB* as an array (1-dimension). As stated,
458/// this function is called with the descending order of recursion depth
459///
460/// Depth = 3
461/// - colA(3-dimension) unnest into temp column temp_P1(2_dimension) (unnesting of P1 starts
462///   from this level)
463/// - colA(3-dimension) having indices repeated by the unnesting operation above
464/// - colB(1-dimension) having indices repeated by the unnesting operation above
465///
466/// Depth = 2
467/// - temp_P1(2-dimension) unnest into temp column temp_P1(1-dimension)
468/// - colA(3-dimension) unnest into temp column temp_P2(2-dimension) (unnesting of P2 starts
469///   from this level)
470/// - colB(1-dimension) having indices repeated by the unnesting operation above
471///
472/// Depth = 1
473/// - temp_P1(1-dimension) unnest into P1
474/// - temp_P2(2-dimension) unnest into P2
475/// - colB(1-dimension) unnest into P3 (unnesting of P3 starts from this level)
476///
477/// The returned array will has the same size as the input batch
478/// and only contains original columns that are not being unnested.
479fn list_unnest_at_level(
480    batch: &[ArrayRef],
481    list_type_unnests: &[ListUnnest],
482    temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
483    level_to_unnest: usize,
484    options: &UnnestOptions,
485) -> Result<Option<Vec<ArrayRef>>> {
486    // Extract unnestable columns at this level
487    let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
488        list_type_unnests
489            .iter()
490            .filter_map(|unnesting| {
491                if level_to_unnest == unnesting.depth {
492                    return Some((
493                        Arc::clone(&batch[unnesting.index_in_input_schema]),
494                        *unnesting,
495                    ));
496                }
497                // This means the unnesting on this item has started at higher level
498                // and need to continue until depth reaches 1
499                if level_to_unnest < unnesting.depth {
500                    return Some((
501                        Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
502                        *unnesting,
503                    ));
504                }
505                None
506            })
507            .unzip();
508
509    // Filter out so that list_arrays only contain column with the highest depth
510    // at the same time, during iteration remove this depth so next time we don't have to unnest them again
511    let longest_length = find_longest_length(&arrs_to_unnest, options)?;
512    let unnested_length = longest_length.as_primitive::<Int64Type>();
513    let total_length = if unnested_length.is_empty() {
514        0
515    } else {
516        sum(unnested_length).ok_or_else(|| {
517            exec_datafusion_err!("Failed to calculate the total unnested length")
518        })? as usize
519    };
520    if total_length == 0 {
521        return Ok(None);
522    }
523
524    // Unnest all the list arrays
525    let unnested_temp_arrays =
526        unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
527
528    // Create the take indices array for other columns
529    let take_indices = create_take_indices(unnested_length, total_length);
530    unnested_temp_arrays
531        .into_iter()
532        .zip(list_unnest_specs.iter())
533        .for_each(|(flatten_arr, unnesting)| {
534            temp_unnested_arrs.insert(*unnesting, flatten_arr);
535        });
536
537    let repeat_mask: Vec<bool> = batch
538        .iter()
539        .enumerate()
540        .map(|(i, _)| {
541            // Check if the column is needed in future levels (levels below the current one)
542            let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
543                unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
544            });
545
546            // Check if the column is involved in unnesting at any level
547            let is_involved_in_unnesting = list_type_unnests
548                .iter()
549                .any(|unnesting| unnesting.index_in_input_schema == i);
550
551            // Repeat columns needed in future levels or not unnested.
552            needed_in_future_levels || !is_involved_in_unnesting
553        })
554        .collect();
555
556    // Dimension of arrays in batch is untouched, but the values are repeated
557    // as the side effect of unnesting
558    let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
559
560    Ok(Some(ret))
561}
562struct UnnestingResult {
563    arr: ArrayRef,
564    depth: usize,
565}
566
567/// For each row in a `RecordBatch`, some list/struct columns need to be unnested.
568/// - For list columns: We will expand the values in each list into multiple rows,
569///   taking the longest length among these lists, and shorter lists are padded with NULLs.
570/// - For struct columns: We will expand the struct columns into multiple subfield columns.
571///
572/// For columns that don't need to be unnested, repeat their values until reaching the longest length.
573///
574/// Note: unnest has a big difference in behavior between Postgres and DuckDB
575///
576/// Take this example
577///
578/// 1. Postgres
579/// ```ignored
580/// create table temp (
581///     i integer[][][], j integer[]
582/// )
583/// insert into temp values ('{{{1,2},{3,4}},{{5,6},{7,8}}}', '{1,2}');
584/// select unnest(i), unnest(j) from temp;
585/// ```
586///
587/// Result
588/// ```text
589///     1   1
590///     2   2
591///     3
592///     4
593///     5
594///     6
595///     7
596///     8
597/// ```
598/// 2. DuckDB
599/// ```ignore
600///     create table temp (i integer[][][], j integer[]);
601///     insert into temp values ([[[1,2],[3,4]],[[5,6],[7,8]]], [1,2]);
602///     select unnest(i,recursive:=true), unnest(j,recursive:=true) from temp;
603/// ```
604/// Result:
605/// ```text
606///
607///     ┌────────────────────────────────────────────────┬────────────────────────────────────────────────┐
608///     │ unnest(i, "recursive" := CAST('t' AS BOOLEAN)) │ unnest(j, "recursive" := CAST('t' AS BOOLEAN)) │
609///     │                     int32                      │                     int32                      │
610///     ├────────────────────────────────────────────────┼────────────────────────────────────────────────┤
611///     │                                              1 │                                              1 │
612///     │                                              2 │                                              2 │
613///     │                                              3 │                                              1 │
614///     │                                              4 │                                              2 │
615///     │                                              5 │                                              1 │
616///     │                                              6 │                                              2 │
617///     │                                              7 │                                              1 │
618///     │                                              8 │                                              2 │
619///     └────────────────────────────────────────────────┴────────────────────────────────────────────────┘
620/// ```
621///
622/// The following implementation refer to DuckDB's implementation
623fn build_batch(
624    batch: &RecordBatch,
625    schema: &SchemaRef,
626    list_type_columns: &[ListUnnest],
627    struct_column_indices: &HashSet<usize>,
628    options: &UnnestOptions,
629) -> Result<Option<RecordBatch>> {
630    let transformed = match list_type_columns.len() {
631        0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
632        _ => {
633            let mut temp_unnested_result = HashMap::new();
634            let max_recursion = list_type_columns
635                .iter()
636                .fold(0, |highest_depth, ListUnnest { depth, .. }| {
637                    cmp::max(highest_depth, *depth)
638                });
639
640            // This arr always has the same column count with the input batch
641            let mut flatten_arrs = vec![];
642
643            // Original batch has the same columns
644            // All unnesting results are written to temp_batch
645            for depth in (1..=max_recursion).rev() {
646                let input = match depth == max_recursion {
647                    true => batch.columns(),
648                    false => &flatten_arrs,
649                };
650                let Some(temp_result) = list_unnest_at_level(
651                    input,
652                    list_type_columns,
653                    &mut temp_unnested_result,
654                    depth,
655                    options,
656                )?
657                else {
658                    return Ok(None);
659                };
660                flatten_arrs = temp_result;
661            }
662            let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
663                temp_unnested_result.into_iter().fold(
664                    HashMap::new(),
665                    |mut acc,
666                     (
667                        ListUnnest {
668                            index_in_input_schema,
669                            depth,
670                        },
671                        flattened_array,
672                    )| {
673                        acc.entry(index_in_input_schema).or_default().push(
674                            UnnestingResult {
675                                arr: flattened_array,
676                                depth,
677                            },
678                        );
679                        acc
680                    },
681                );
682            let output_order: HashMap<ListUnnest, usize> = list_type_columns
683                .iter()
684                .enumerate()
685                .map(|(order, unnest_def)| (*unnest_def, order))
686                .collect();
687
688            // One original column may be unnested multiple times into separate columns
689            let mut multi_unnested_per_original_index = unnested_array_map
690                .into_iter()
691                .map(
692                    // Each item in unnested_columns is the result of unnesting the same input column
693                    // we need to sort them to conform with the original expression order
694                    // e.g unnest(unnest(col)) must goes before unnest(col)
695                    |(original_index, mut unnested_columns)| {
696                        unnested_columns.sort_by(
697                            |UnnestingResult { depth: depth1, .. },
698                             UnnestingResult { depth: depth2, .. }|
699                             -> Ordering {
700                                output_order
701                                    .get(&ListUnnest {
702                                        depth: *depth1,
703                                        index_in_input_schema: original_index,
704                                    })
705                                    .unwrap()
706                                    .cmp(
707                                        output_order
708                                            .get(&ListUnnest {
709                                                depth: *depth2,
710                                                index_in_input_schema: original_index,
711                                            })
712                                            .unwrap(),
713                                    )
714                            },
715                        );
716                        (
717                            original_index,
718                            unnested_columns
719                                .into_iter()
720                                .map(|result| result.arr)
721                                .collect::<Vec<_>>(),
722                        )
723                    },
724                )
725                .collect::<HashMap<_, _>>();
726
727            let ret = flatten_arrs
728                .into_iter()
729                .enumerate()
730                .flat_map(|(col_idx, arr)| {
731                    // Convert original column into its unnested version(s)
732                    // Plural because one column can be unnested with different recursion level
733                    // and into separate output columns
734                    match multi_unnested_per_original_index.remove(&col_idx) {
735                        Some(unnested_arrays) => unnested_arrays,
736                        None => vec![arr],
737                    }
738                })
739                .collect::<Vec<_>>();
740
741            flatten_struct_cols(&ret, schema, struct_column_indices)
742        }
743    }?;
744    Ok(Some(transformed))
745}
746
747/// Find the longest list length among the given list arrays for each row.
748///
749/// For example if we have the following two list arrays:
750///
751/// ```ignore
752/// l1: [1, 2, 3], null, [], [3]
753/// l2: [4,5], [], null, [6, 7]
754/// ```
755///
756/// If `preserve_nulls` is false, the longest length array will be:
757///
758/// ```ignore
759/// longest_length: [3, 0, 0, 2]
760/// ```
761///
762/// whereas if `preserve_nulls` is true, the longest length array will be:
763///
764///
765/// ```ignore
766/// longest_length: [3, 1, 1, 2]
767/// ```
768fn find_longest_length(
769    list_arrays: &[ArrayRef],
770    options: &UnnestOptions,
771) -> Result<ArrayRef> {
772    // The length of a NULL list
773    let null_length = if options.preserve_nulls {
774        Scalar::new(Int64Array::from_value(1, 1))
775    } else {
776        Scalar::new(Int64Array::from_value(0, 1))
777    };
778    let list_lengths: Vec<ArrayRef> = list_arrays
779        .iter()
780        .map(|list_array| {
781            let mut length_array = length(list_array)?;
782            // Make sure length arrays have the same type. Int64 is the most general one.
783            length_array = cast(&length_array, &DataType::Int64)?;
784            length_array =
785                zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
786            Ok(length_array)
787        })
788        .collect::<Result<_>>()?;
789
790    let longest_length = list_lengths.iter().skip(1).try_fold(
791        Arc::clone(&list_lengths[0]),
792        |longest, current| {
793            let is_lt = lt(&longest, &current)?;
794            zip(&is_lt, &current, &longest)
795        },
796    )?;
797    Ok(longest_length)
798}
799
800/// Trait defining common methods used for unnesting, implemented by list array types.
801trait ListArrayType: Array {
802    /// Returns a reference to the values of this list.
803    fn values(&self) -> &ArrayRef;
804
805    /// Returns the start and end offset of the values for the given row.
806    fn value_offsets(&self, row: usize) -> (i64, i64);
807}
808
809impl ListArrayType for ListArray {
810    fn values(&self) -> &ArrayRef {
811        self.values()
812    }
813
814    fn value_offsets(&self, row: usize) -> (i64, i64) {
815        let offsets = self.value_offsets();
816        (offsets[row].into(), offsets[row + 1].into())
817    }
818}
819
820impl ListArrayType for LargeListArray {
821    fn values(&self) -> &ArrayRef {
822        self.values()
823    }
824
825    fn value_offsets(&self, row: usize) -> (i64, i64) {
826        let offsets = self.value_offsets();
827        (offsets[row], offsets[row + 1])
828    }
829}
830
831impl ListArrayType for FixedSizeListArray {
832    fn values(&self) -> &ArrayRef {
833        self.values()
834    }
835
836    fn value_offsets(&self, row: usize) -> (i64, i64) {
837        let start = self.value_offset(row) as i64;
838        (start, start + self.value_length() as i64)
839    }
840}
841
842/// Unnest multiple list arrays according to the length array.
843fn unnest_list_arrays(
844    list_arrays: &[ArrayRef],
845    length_array: &PrimitiveArray<Int64Type>,
846    capacity: usize,
847) -> Result<Vec<ArrayRef>> {
848    let typed_arrays = list_arrays
849        .iter()
850        .map(|list_array| match list_array.data_type() {
851            DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
852            DataType::LargeList(_) => {
853                Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
854            }
855            DataType::FixedSizeList(_, _) => {
856                Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
857            }
858            other => exec_err!("Invalid unnest datatype {other }"),
859        })
860        .collect::<Result<Vec<_>>>()?;
861
862    typed_arrays
863        .iter()
864        .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
865        .collect::<Result<_>>()
866}
867
868/// Unnest a list array according the target length array.
869///
870/// Consider a list array like this:
871///
872/// ```ignore
873/// [1], [2, 3, 4], null, [5], [],
874/// ```
875///
876/// and the length array is:
877///
878/// ```ignore
879/// [2, 3, 2, 1, 2]
880/// ```
881///
882/// If the length of a certain list is less than the target length, pad with NULLs.
883/// So the unnested array will look like this:
884///
885/// ```ignore
886/// [1, null, 2, 3, 4, null, null, 5, null, null]
887/// ```
888fn unnest_list_array(
889    list_array: &dyn ListArrayType,
890    length_array: &PrimitiveArray<Int64Type>,
891    capacity: usize,
892) -> Result<ArrayRef> {
893    let values = list_array.values();
894    let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
895    for row in 0..list_array.len() {
896        let mut value_length = 0;
897        if !list_array.is_null(row) {
898            let (start, end) = list_array.value_offsets(row);
899            value_length = end - start;
900            for i in start..end {
901                take_indices_builder.append_value(i)
902            }
903        }
904        let target_length = length_array.value(row);
905        debug_assert!(
906            value_length <= target_length,
907            "value length is beyond the longest length"
908        );
909        // Pad with NULL values
910        for _ in value_length..target_length {
911            take_indices_builder.append_null();
912        }
913    }
914    Ok(kernels::take::take(
915        &values,
916        &take_indices_builder.finish(),
917        None,
918    )?)
919}
920
921/// Creates take indices that will be used to expand all columns except for the list type
922/// [`columns`](UnnestExec::list_column_indices) that is being unnested.
923/// Every column value needs to be repeated multiple times according to the length array.
924///
925/// If the length array looks like this:
926///
927/// ```ignore
928/// [2, 3, 1]
929/// ```
930/// Then [`create_take_indices`] will return an array like this
931///
932/// ```ignore
933/// [0, 0, 1, 1, 1, 2]
934/// ```
935fn create_take_indices(
936    length_array: &PrimitiveArray<Int64Type>,
937    capacity: usize,
938) -> PrimitiveArray<Int64Type> {
939    // `find_longest_length()` guarantees this.
940    debug_assert!(
941        length_array.null_count() == 0,
942        "length array should not contain nulls"
943    );
944    let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
945    for (index, repeat) in length_array.iter().enumerate() {
946        // The length array should not contain nulls, so unwrap is safe
947        let repeat = repeat.unwrap();
948        (0..repeat).for_each(|_| builder.append_value(index as i64));
949    }
950    builder.finish()
951}
952
953/// Create a batch of arrays based on an input `batch` and a `indices` array.
954/// The `indices` array is used by the take kernel to repeat values in the arrays
955/// that are marked with `true` in the `repeat_mask`. Arrays marked with `false`
956/// in the `repeat_mask` will be replaced with arrays filled with nulls of the
957/// appropriate length.
958///
959/// For example if we have the following batch:
960///
961/// ```ignore
962/// c1: [1], null, [2, 3, 4], null, [5, 6]
963/// c2: 'a', 'b',  'c', null, 'd'
964/// ```
965///
966/// then the `unnested_list_arrays` contains the unnest column that will replace `c1` in
967/// the final batch if `preserve_nulls` is true:
968///
969/// ```ignore
970/// c1: 1, null, 2, 3, 4, null, 5, 6
971/// ```
972///
973/// And the `indices` array contains the indices that are used by `take` kernel to
974/// repeat the values in `c2`:
975///
976/// ```ignore
977/// 0, 1, 2, 2, 2, 3, 4, 4
978/// ```
979///
980/// so that the final batch will look like:
981///
982/// ```ignore
983/// c1: 1, null, 2, 3, 4, null, 5, 6
984/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
985/// ```
986///
987/// The `repeat_mask` determines whether an array's values are repeated or replaced with nulls.
988/// For example, if the `repeat_mask` is:
989///
990/// ```ignore
991/// [true, false]
992/// ```
993///
994/// The final batch will look like:
995///
996/// ```ignore
997/// c1: 1, null, 2, 3, 4, null, 5, 6  // Repeated using `indices`
998/// c2: null, null, null, null, null, null, null, null  // Replaced with nulls
999fn repeat_arrs_from_indices(
1000    batch: &[ArrayRef],
1001    indices: &PrimitiveArray<Int64Type>,
1002    repeat_mask: &[bool],
1003) -> Result<Vec<Arc<dyn Array>>> {
1004    batch
1005        .iter()
1006        .zip(repeat_mask.iter())
1007        .map(|(arr, &repeat)| {
1008            if repeat {
1009                Ok(kernels::take::take(arr, indices, None)?)
1010            } else {
1011                Ok(new_null_array(arr.data_type(), arr.len()))
1012            }
1013        })
1014        .collect()
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019    use super::*;
1020    use arrow::array::{
1021        GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
1022    };
1023    use arrow::buffer::{NullBuffer, OffsetBuffer};
1024    use arrow::datatypes::{Field, Int32Type};
1025    use datafusion_common::test_util::batches_to_string;
1026    use insta::assert_snapshot;
1027
1028    // Create a GenericListArray with the following list values:
1029    //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1030    fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
1031    where
1032        OffsetSize: OffsetSizeTrait,
1033    {
1034        let mut values = vec![];
1035        let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
1036        let mut valid = NullBufferBuilder::new(6);
1037
1038        // [A, B, C]
1039        values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
1040        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1041        valid.append_non_null();
1042
1043        // []
1044        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1045        valid.append_non_null();
1046
1047        // NULL with non-zero value length
1048        // Issue https://github.com/apache/datafusion/issues/9932
1049        values.push(Some("?"));
1050        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1051        valid.append_null();
1052
1053        // [D]
1054        values.push(Some("D"));
1055        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1056        valid.append_non_null();
1057
1058        // Another NULL with zero value length
1059        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1060        valid.append_null();
1061
1062        // [NULL, F]
1063        values.extend_from_slice(&[None, Some("F")]);
1064        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1065        valid.append_non_null();
1066
1067        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1068        GenericListArray::<OffsetSize>::new(
1069            field,
1070            OffsetBuffer::new(offsets.into()),
1071            Arc::new(StringArray::from(values)),
1072            valid.finish(),
1073        )
1074    }
1075
1076    // Create a FixedSizeListArray with the following list values:
1077    //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1078    fn make_fixed_list() -> FixedSizeListArray {
1079        let values = Arc::new(StringArray::from_iter([
1080            Some("A"),
1081            Some("B"),
1082            None,
1083            None,
1084            Some("C"),
1085            Some("D"),
1086            None,
1087            None,
1088            None,
1089            Some("F"),
1090            None,
1091            None,
1092        ]));
1093        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1094        let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1095        FixedSizeListArray::new(field, 2, values, Some(valid))
1096    }
1097
1098    fn verify_unnest_list_array(
1099        list_array: &dyn ListArrayType,
1100        lengths: Vec<i64>,
1101        expected: Vec<Option<&str>>,
1102    ) -> Result<()> {
1103        let length_array = Int64Array::from(lengths);
1104        let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1105        let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1106        assert_eq!(strs, expected);
1107        Ok(())
1108    }
1109
1110    #[test]
1111    fn test_build_batch_list_arr_recursive() -> Result<()> {
1112        // col1                             | col2
1113        // [[1,2,3],null,[4,5]]             | ['a','b']
1114        // [[7,8,9,10], null, [11,12,13]]   | ['c','d']
1115        // null                             | ['e']
1116        let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1117            Some(vec![Some(1), Some(2), Some(3)]),
1118            None,
1119            Some(vec![Some(4), Some(5)]),
1120            Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1121            None,
1122            Some(vec![Some(11), Some(12), Some(13)]),
1123        ]);
1124
1125        let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1126        let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1127        let mut nulls = NullBufferBuilder::new(3);
1128        nulls.append_non_null();
1129        nulls.append_non_null();
1130        nulls.append_null();
1131        // list<list<int32>>
1132        let col1_field = Field::new_list_field(
1133            DataType::List(Arc::new(Field::new_list_field(
1134                list_arr1_ref.data_type().to_owned(),
1135                true,
1136            ))),
1137            true,
1138        );
1139        let col1 = ListArray::new(
1140            Arc::new(Field::new_list_field(
1141                list_arr1_ref.data_type().to_owned(),
1142                true,
1143            )),
1144            offsets,
1145            list_arr1_ref,
1146            nulls.finish(),
1147        );
1148
1149        let list_arr2 = StringArray::from(vec![
1150            Some("a"),
1151            Some("b"),
1152            Some("c"),
1153            Some("d"),
1154            Some("e"),
1155        ]);
1156
1157        let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1158        let mut nulls = NullBufferBuilder::new(3);
1159        nulls.append_n_non_nulls(3);
1160        let col2_field = Field::new(
1161            "col2",
1162            DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1163            true,
1164        );
1165        let col2 = GenericListArray::<i32>::new(
1166            Arc::new(Field::new_list_field(DataType::Utf8, true)),
1167            OffsetBuffer::new(offsets.into()),
1168            Arc::new(list_arr2),
1169            nulls.finish(),
1170        );
1171        // convert col1 and col2 to a record batch
1172        let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1173        let out_schema = Arc::new(Schema::new(vec![
1174            Field::new(
1175                "col1_unnest_placeholder_depth_1",
1176                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1177                true,
1178            ),
1179            Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1180            Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1181        ]));
1182        let batch = RecordBatch::try_new(
1183            Arc::clone(&schema),
1184            vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1185        )
1186        .unwrap();
1187        let list_type_columns = vec![
1188            ListUnnest {
1189                index_in_input_schema: 0,
1190                depth: 1,
1191            },
1192            ListUnnest {
1193                index_in_input_schema: 0,
1194                depth: 2,
1195            },
1196            ListUnnest {
1197                index_in_input_schema: 1,
1198                depth: 1,
1199            },
1200        ];
1201        let ret = build_batch(
1202            &batch,
1203            &out_schema,
1204            list_type_columns.as_ref(),
1205            &HashSet::default(),
1206            &UnnestOptions {
1207                preserve_nulls: true,
1208                recursions: vec![],
1209            },
1210        )?
1211        .unwrap();
1212
1213        assert_snapshot!(batches_to_string(&[ret]),
1214        @r"
1215        +---------------------------------+---------------------------------+---------------------------------+
1216        | col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |
1217        +---------------------------------+---------------------------------+---------------------------------+
1218        | [1, 2, 3]                       | 1                               | a                               |
1219        |                                 | 2                               | b                               |
1220        | [4, 5]                          | 3                               |                                 |
1221        | [1, 2, 3]                       |                                 | a                               |
1222        |                                 |                                 | b                               |
1223        | [4, 5]                          |                                 |                                 |
1224        | [1, 2, 3]                       | 4                               | a                               |
1225        |                                 | 5                               | b                               |
1226        | [4, 5]                          |                                 |                                 |
1227        | [7, 8, 9, 10]                   | 7                               | c                               |
1228        |                                 | 8                               | d                               |
1229        | [11, 12, 13]                    | 9                               |                                 |
1230        |                                 | 10                              |                                 |
1231        | [7, 8, 9, 10]                   |                                 | c                               |
1232        |                                 |                                 | d                               |
1233        | [11, 12, 13]                    |                                 |                                 |
1234        | [7, 8, 9, 10]                   | 11                              | c                               |
1235        |                                 | 12                              | d                               |
1236        | [11, 12, 13]                    | 13                              |                                 |
1237        |                                 |                                 | e                               |
1238        +---------------------------------+---------------------------------+---------------------------------+
1239        ");
1240        Ok(())
1241    }
1242
1243    #[test]
1244    fn test_unnest_list_array() -> Result<()> {
1245        // [A, B, C], [], NULL, [D], NULL, [NULL, F]
1246        let list_array = make_generic_array::<i32>();
1247        verify_unnest_list_array(
1248            &list_array,
1249            vec![3, 2, 1, 2, 0, 3],
1250            vec![
1251                Some("A"),
1252                Some("B"),
1253                Some("C"),
1254                None,
1255                None,
1256                None,
1257                Some("D"),
1258                None,
1259                None,
1260                Some("F"),
1261                None,
1262            ],
1263        )?;
1264
1265        // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1266        let list_array = make_fixed_list();
1267        verify_unnest_list_array(
1268            &list_array,
1269            vec![3, 1, 2, 0, 2, 3],
1270            vec![
1271                Some("A"),
1272                Some("B"),
1273                None,
1274                None,
1275                Some("C"),
1276                Some("D"),
1277                None,
1278                Some("F"),
1279                None,
1280                None,
1281                None,
1282            ],
1283        )?;
1284
1285        Ok(())
1286    }
1287
1288    fn verify_longest_length(
1289        list_arrays: &[ArrayRef],
1290        preserve_nulls: bool,
1291        expected: Vec<i64>,
1292    ) -> Result<()> {
1293        let options = UnnestOptions {
1294            preserve_nulls,
1295            recursions: vec![],
1296        };
1297        let longest_length = find_longest_length(list_arrays, &options)?;
1298        let expected_array = Int64Array::from(expected);
1299        assert_eq!(
1300            longest_length
1301                .as_any()
1302                .downcast_ref::<Int64Array>()
1303                .unwrap(),
1304            &expected_array
1305        );
1306        Ok(())
1307    }
1308
1309    #[test]
1310    fn test_longest_list_length() -> Result<()> {
1311        // Test with single ListArray
1312        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1313        let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1314        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1315        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1316
1317        // Test with single LargeListArray
1318        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1319        let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1320        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1321        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1322
1323        // Test with single FixedSizeListArray
1324        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1325        let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1326        verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1327        verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1328
1329        // Test with multiple list arrays
1330        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1331        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1332        let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1333        let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1334        let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1335        verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1336        verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1337
1338        Ok(())
1339    }
1340
1341    #[test]
1342    fn test_create_take_indices() -> Result<()> {
1343        let length_array = Int64Array::from(vec![2, 3, 1]);
1344        let take_indices = create_take_indices(&length_array, 6);
1345        let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1346        assert_eq!(take_indices, expected);
1347        Ok(())
1348    }
1349}