Skip to main content

datafusion_physical_plan/
unnest.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define a plan for unnesting values in columns that contain a list type.
19
20use std::cmp::{self, Ordering};
21use std::task::{Poll, ready};
22use std::{any::Any, sync::Arc};
23
24use super::metrics::{
25    self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
26    RecordOutput,
27};
28use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
29use crate::{
30    DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
31    SendableRecordBatchStream,
32};
33
34use arrow::array::{
35    Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray, Int64Array,
36    LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray, new_null_array,
37};
38use arrow::compute::kernels::length::length;
39use arrow::compute::kernels::zip::zip;
40use arrow::compute::{cast, is_not_null, kernels, sum};
41use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
42use arrow::record_batch::RecordBatch;
43use arrow_ord::cmp::lt;
44use async_trait::async_trait;
45use datafusion_common::{
46    Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err,
47    internal_err,
48};
49use datafusion_execution::TaskContext;
50use datafusion_physical_expr::PhysicalExpr;
51use datafusion_physical_expr::equivalence::ProjectionMapping;
52use datafusion_physical_expr::expressions::Column;
53use futures::{Stream, StreamExt};
54use log::trace;
55
56/// Unnest the given columns (either with type struct or list)
57/// For list unnesting, each row is vertically transformed into multiple rows
58/// For struct unnesting, each column is horizontally transformed into multiple columns,
59/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
60///
61/// See [`UnnestOptions`] for more details and an example.
62#[derive(Debug, Clone)]
63pub struct UnnestExec {
64    /// Input execution plan
65    input: Arc<dyn ExecutionPlan>,
66    /// The schema once the unnest is applied
67    schema: SchemaRef,
68    /// Indices of the list-typed columns in the input schema
69    list_column_indices: Vec<ListUnnest>,
70    /// Indices of the struct-typed columns in the input schema
71    struct_column_indices: Vec<usize>,
72    /// Options
73    options: UnnestOptions,
74    /// Execution metrics
75    metrics: ExecutionPlanMetricsSet,
76    /// Cache holding plan properties like equivalences, output partitioning etc.
77    cache: PlanProperties,
78}
79
80impl UnnestExec {
81    /// Create a new [UnnestExec].
82    pub fn new(
83        input: Arc<dyn ExecutionPlan>,
84        list_column_indices: Vec<ListUnnest>,
85        struct_column_indices: Vec<usize>,
86        schema: SchemaRef,
87        options: UnnestOptions,
88    ) -> Result<Self> {
89        let cache = Self::compute_properties(
90            &input,
91            &list_column_indices,
92            &struct_column_indices,
93            &schema,
94        )?;
95
96        Ok(UnnestExec {
97            input,
98            schema,
99            list_column_indices,
100            struct_column_indices,
101            options,
102            metrics: Default::default(),
103            cache,
104        })
105    }
106
107    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
108    fn compute_properties(
109        input: &Arc<dyn ExecutionPlan>,
110        list_column_indices: &[ListUnnest],
111        struct_column_indices: &[usize],
112        schema: &SchemaRef,
113    ) -> Result<PlanProperties> {
114        // Find out which indices are not unnested, such that they can be copied over from the input plan
115        let input_schema = input.schema();
116        let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len());
117        unnested_indices.append_n(input_schema.fields().len(), false);
118        for list_unnest in list_column_indices {
119            unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
120        }
121        for struct_unnest in struct_column_indices {
122            unnested_indices.set_bit(*struct_unnest, true)
123        }
124        let unnested_indices = unnested_indices.finish();
125        let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
126            .filter(|idx| !unnested_indices.value(*idx))
127            .collect();
128
129        // Manually build projection mapping from non-unnested input columns to their positions in the output
130        let input_schema = input.schema();
131        let projection_mapping: ProjectionMapping = non_unnested_indices
132            .iter()
133            .map(|&input_idx| {
134                // Find what index the input column has in the output schema
135                let input_field = input_schema.field(input_idx);
136                let output_idx = schema
137                    .fields()
138                    .iter()
139                    .position(|output_field| output_field.name() == input_field.name())
140                    .ok_or_else(|| {
141                        exec_datafusion_err!(
142                            "Non-unnested column '{}' must exist in output schema",
143                            input_field.name()
144                        )
145                    })?;
146
147                let input_col = Arc::new(Column::new(input_field.name(), input_idx))
148                    as Arc<dyn PhysicalExpr>;
149                let target_col = Arc::new(Column::new(input_field.name(), output_idx))
150                    as Arc<dyn PhysicalExpr>;
151                // Use From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets
152                let targets = vec![(target_col, output_idx)].into();
153                Ok((input_col, targets))
154            })
155            .collect::<Result<ProjectionMapping>>()?;
156
157        // Create the unnest's equivalence properties by copying the input plan's equivalence properties
158        // for the unaffected columns. Except for the constraints, which are removed entirely because
159        // the unnest operation invalidates any global uniqueness or primary-key constraints.
160        let input_eq_properties = input.equivalence_properties();
161        let eq_properties = input_eq_properties
162            .project(&projection_mapping, Arc::clone(schema))
163            .with_constraints(Constraints::default());
164
165        // Output partitioning must use the projection mapping
166        let output_partitioning = input
167            .output_partitioning()
168            .project(&projection_mapping, &eq_properties);
169
170        Ok(PlanProperties::new(
171            eq_properties,
172            output_partitioning,
173            input.pipeline_behavior(),
174            input.boundedness(),
175        ))
176    }
177
178    /// Input execution plan
179    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
180        &self.input
181    }
182
183    /// Indices of the list-typed columns in the input schema
184    pub fn list_column_indices(&self) -> &[ListUnnest] {
185        &self.list_column_indices
186    }
187
188    /// Indices of the struct-typed columns in the input schema
189    pub fn struct_column_indices(&self) -> &[usize] {
190        &self.struct_column_indices
191    }
192
193    pub fn options(&self) -> &UnnestOptions {
194        &self.options
195    }
196}
197
198impl DisplayAs for UnnestExec {
199    fn fmt_as(
200        &self,
201        t: DisplayFormatType,
202        f: &mut std::fmt::Formatter,
203    ) -> std::fmt::Result {
204        match t {
205            DisplayFormatType::Default | DisplayFormatType::Verbose => {
206                write!(f, "UnnestExec")
207            }
208            DisplayFormatType::TreeRender => {
209                write!(f, "")
210            }
211        }
212    }
213}
214
215impl ExecutionPlan for UnnestExec {
216    fn name(&self) -> &'static str {
217        "UnnestExec"
218    }
219
220    fn as_any(&self) -> &dyn Any {
221        self
222    }
223
224    fn properties(&self) -> &PlanProperties {
225        &self.cache
226    }
227
228    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
229        vec![&self.input]
230    }
231
232    fn with_new_children(
233        self: Arc<Self>,
234        children: Vec<Arc<dyn ExecutionPlan>>,
235    ) -> Result<Arc<dyn ExecutionPlan>> {
236        Ok(Arc::new(UnnestExec::new(
237            Arc::clone(&children[0]),
238            self.list_column_indices.clone(),
239            self.struct_column_indices.clone(),
240            Arc::clone(&self.schema),
241            self.options.clone(),
242        )?))
243    }
244
245    fn required_input_distribution(&self) -> Vec<Distribution> {
246        vec![Distribution::UnspecifiedDistribution]
247    }
248
249    fn execute(
250        &self,
251        partition: usize,
252        context: Arc<TaskContext>,
253    ) -> Result<SendableRecordBatchStream> {
254        let input = self.input.execute(partition, context)?;
255        let metrics = UnnestMetrics::new(partition, &self.metrics);
256
257        Ok(Box::pin(UnnestStream {
258            input,
259            schema: Arc::clone(&self.schema),
260            list_type_columns: self.list_column_indices.clone(),
261            struct_column_indices: self.struct_column_indices.iter().copied().collect(),
262            options: self.options.clone(),
263            metrics,
264        }))
265    }
266
267    fn metrics(&self) -> Option<MetricsSet> {
268        Some(self.metrics.clone_inner())
269    }
270}
271
272#[derive(Clone, Debug)]
273struct UnnestMetrics {
274    /// Execution metrics
275    baseline_metrics: BaselineMetrics,
276    /// Number of batches consumed
277    input_batches: metrics::Count,
278    /// Number of rows consumed
279    input_rows: metrics::Count,
280}
281
282impl UnnestMetrics {
283    fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
284        let input_batches =
285            MetricBuilder::new(metrics).counter("input_batches", partition);
286
287        let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
288
289        Self {
290            baseline_metrics: BaselineMetrics::new(metrics, partition),
291            input_batches,
292            input_rows,
293        }
294    }
295}
296
297/// A stream that issues [RecordBatch]es with unnested column data.
298struct UnnestStream {
299    /// Input stream
300    input: SendableRecordBatchStream,
301    /// Unnested schema
302    schema: Arc<Schema>,
303    /// represents all unnest operations to be applied to the input (input index, depth)
304    /// e.g unnest(col1),unnest(unnest(col1)) where col1 has index 1 in original input schema
305    /// then list_type_columns = [ListUnnest{1,1},ListUnnest{1,2}]
306    list_type_columns: Vec<ListUnnest>,
307    struct_column_indices: HashSet<usize>,
308    /// Options
309    options: UnnestOptions,
310    /// Metrics
311    metrics: UnnestMetrics,
312}
313
314impl RecordBatchStream for UnnestStream {
315    fn schema(&self) -> SchemaRef {
316        Arc::clone(&self.schema)
317    }
318}
319
320#[async_trait]
321impl Stream for UnnestStream {
322    type Item = Result<RecordBatch>;
323
324    fn poll_next(
325        mut self: std::pin::Pin<&mut Self>,
326        cx: &mut std::task::Context<'_>,
327    ) -> Poll<Option<Self::Item>> {
328        self.poll_next_impl(cx)
329    }
330}
331
332impl UnnestStream {
333    /// Separate implementation function that unpins the [`UnnestStream`] so
334    /// that partial borrows work correctly
335    fn poll_next_impl(
336        &mut self,
337        cx: &mut std::task::Context<'_>,
338    ) -> Poll<Option<Result<RecordBatch>>> {
339        loop {
340            return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
341                Some(Ok(batch)) => {
342                    let elapsed_compute =
343                        self.metrics.baseline_metrics.elapsed_compute().clone();
344                    let timer = elapsed_compute.timer();
345                    self.metrics.input_batches.add(1);
346                    self.metrics.input_rows.add(batch.num_rows());
347                    let result = build_batch(
348                        &batch,
349                        &self.schema,
350                        &self.list_type_columns,
351                        &self.struct_column_indices,
352                        &self.options,
353                    )?;
354                    timer.done();
355                    let Some(result_batch) = result else {
356                        continue;
357                    };
358                    (&result_batch).record_output(&self.metrics.baseline_metrics);
359
360                    // Empty record batches should not be emitted.
361                    // They need to be treated as  [`Option<RecordBatch>`]es and handled separately
362                    debug_assert!(result_batch.num_rows() > 0);
363                    Some(Ok(result_batch))
364                }
365                other => {
366                    trace!(
367                        "Processed {} probe-side input batches containing {} rows and \
368                        produced {} output batches containing {} rows in {}",
369                        self.metrics.input_batches,
370                        self.metrics.input_rows,
371                        self.metrics.baseline_metrics.output_batches(),
372                        self.metrics.baseline_metrics.output_rows(),
373                        self.metrics.baseline_metrics.elapsed_compute(),
374                    );
375                    other
376                }
377            });
378        }
379    }
380}
381
382/// Given a set of struct column indices to flatten
383/// try converting the column in input into multiple subfield columns
384/// For example
385/// struct_col: [a: struct(item: int, name: string), b: int]
386/// with a batch
387/// {a: {item: 1, name: "a"}, b: 2},
388/// {a: {item: 3, name: "b"}, b: 4]
389/// will be converted into
390/// {a.item: 1, a.name: "a", b: 2},
391/// {a.item: 3, a.name: "b", b: 4}
392fn flatten_struct_cols(
393    input_batch: &[Arc<dyn Array>],
394    schema: &SchemaRef,
395    struct_column_indices: &HashSet<usize>,
396) -> Result<RecordBatch> {
397    // horizontal expansion because of struct unnest
398    let columns_expanded = input_batch
399        .iter()
400        .enumerate()
401        .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
402            Some(_) => match column_data.data_type() {
403                DataType::Struct(_) => {
404                    let struct_arr =
405                        column_data.as_any().downcast_ref::<StructArray>().unwrap();
406                    Ok(struct_arr.columns().to_vec())
407                }
408                data_type => internal_err!(
409                    "expecting column {} from input plan to be a struct, got {:?}",
410                    idx,
411                    data_type
412                ),
413            },
414            None => Ok(vec![Arc::clone(column_data)]),
415        })
416        .collect::<Result<Vec<_>>>()?
417        .into_iter()
418        .flatten()
419        .collect();
420    Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
421}
422
423#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
424pub struct ListUnnest {
425    pub index_in_input_schema: usize,
426    pub depth: usize,
427}
428
429/// This function is used to execute the unnesting on multiple columns all at once, but
430/// one level at a time, and is called n times, where n is the highest recursion level among
431/// the unnest exprs in the query.
432///
433/// For example giving the following query:
434/// ```sql
435/// select unnest(colA, max_depth:=3) as P1, unnest(colA,max_depth:=2) as P2, unnest(colB, max_depth:=1) as P3 from temp;
436/// ```
437/// Then the total times this function being called is 3
438///
439/// It needs to be aware of which level the current unnesting is, because if there exists
440/// multiple unnesting on the same column, but with different recursion levels, say
441/// **unnest(colA, max_depth:=3)** and **unnest(colA, max_depth:=2)**, then the unnesting
442/// of expr **unnest(colA, max_depth:=3)** will start at level 3, while unnesting for expr
443/// **unnest(colA, max_depth:=2)** has to start at level 2
444///
445/// Set *colA* as a 3-dimension columns and *colB* as an array (1-dimension). As stated,
446/// this function is called with the descending order of recursion depth
447///
448/// Depth = 3
449/// - colA(3-dimension) unnest into temp column temp_P1(2_dimension) (unnesting of P1 starts
450///   from this level)
451/// - colA(3-dimension) having indices repeated by the unnesting operation above
452/// - colB(1-dimension) having indices repeated by the unnesting operation above
453///
454/// Depth = 2
455/// - temp_P1(2-dimension) unnest into temp column temp_P1(1-dimension)
456/// - colA(3-dimension) unnest into temp column temp_P2(2-dimension) (unnesting of P2 starts
457///   from this level)
458/// - colB(1-dimension) having indices repeated by the unnesting operation above
459///
460/// Depth = 1
461/// - temp_P1(1-dimension) unnest into P1
462/// - temp_P2(2-dimension) unnest into P2
463/// - colB(1-dimension) unnest into P3 (unnesting of P3 starts from this level)
464///
465/// The returned array will has the same size as the input batch
466/// and only contains original columns that are not being unnested.
467fn list_unnest_at_level(
468    batch: &[ArrayRef],
469    list_type_unnests: &[ListUnnest],
470    temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
471    level_to_unnest: usize,
472    options: &UnnestOptions,
473) -> Result<Option<Vec<ArrayRef>>> {
474    // Extract unnestable columns at this level
475    let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
476        list_type_unnests
477            .iter()
478            .filter_map(|unnesting| {
479                if level_to_unnest == unnesting.depth {
480                    return Some((
481                        Arc::clone(&batch[unnesting.index_in_input_schema]),
482                        *unnesting,
483                    ));
484                }
485                // This means the unnesting on this item has started at higher level
486                // and need to continue until depth reaches 1
487                if level_to_unnest < unnesting.depth {
488                    return Some((
489                        Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
490                        *unnesting,
491                    ));
492                }
493                None
494            })
495            .unzip();
496
497    // Filter out so that list_arrays only contain column with the highest depth
498    // at the same time, during iteration remove this depth so next time we don't have to unnest them again
499    let longest_length = find_longest_length(&arrs_to_unnest, options)?;
500    let unnested_length = longest_length.as_primitive::<Int64Type>();
501    let total_length = if unnested_length.is_empty() {
502        0
503    } else {
504        sum(unnested_length).ok_or_else(|| {
505            exec_datafusion_err!("Failed to calculate the total unnested length")
506        })? as usize
507    };
508    if total_length == 0 {
509        return Ok(None);
510    }
511
512    // Unnest all the list arrays
513    let unnested_temp_arrays =
514        unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
515
516    // Create the take indices array for other columns
517    let take_indices = create_take_indices(unnested_length, total_length);
518    unnested_temp_arrays
519        .into_iter()
520        .zip(list_unnest_specs.iter())
521        .for_each(|(flatten_arr, unnesting)| {
522            temp_unnested_arrs.insert(*unnesting, flatten_arr);
523        });
524
525    let repeat_mask: Vec<bool> = batch
526        .iter()
527        .enumerate()
528        .map(|(i, _)| {
529            // Check if the column is needed in future levels (levels below the current one)
530            let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
531                unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
532            });
533
534            // Check if the column is involved in unnesting at any level
535            let is_involved_in_unnesting = list_type_unnests
536                .iter()
537                .any(|unnesting| unnesting.index_in_input_schema == i);
538
539            // Repeat columns needed in future levels or not unnested.
540            needed_in_future_levels || !is_involved_in_unnesting
541        })
542        .collect();
543
544    // Dimension of arrays in batch is untouched, but the values are repeated
545    // as the side effect of unnesting
546    let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
547
548    Ok(Some(ret))
549}
550struct UnnestingResult {
551    arr: ArrayRef,
552    depth: usize,
553}
554
555/// For each row in a `RecordBatch`, some list/struct columns need to be unnested.
556/// - For list columns: We will expand the values in each list into multiple rows,
557///   taking the longest length among these lists, and shorter lists are padded with NULLs.
558/// - For struct columns: We will expand the struct columns into multiple subfield columns.
559///
560/// For columns that don't need to be unnested, repeat their values until reaching the longest length.
561///
562/// Note: unnest has a big difference in behavior between Postgres and DuckDB
563///
564/// Take this example
565///
566/// 1. Postgres
567/// ```ignored
568/// create table temp (
569///     i integer[][][], j integer[]
570/// )
571/// insert into temp values ('{{{1,2},{3,4}},{{5,6},{7,8}}}', '{1,2}');
572/// select unnest(i), unnest(j) from temp;
573/// ```
574///
575/// Result
576/// ```text
577///     1   1
578///     2   2
579///     3
580///     4
581///     5
582///     6
583///     7
584///     8
585/// ```
586/// 2. DuckDB
587/// ```ignore
588///     create table temp (i integer[][][], j integer[]);
589///     insert into temp values ([[[1,2],[3,4]],[[5,6],[7,8]]], [1,2]);
590///     select unnest(i,recursive:=true), unnest(j,recursive:=true) from temp;
591/// ```
592/// Result:
593/// ```text
594///
595///     ┌────────────────────────────────────────────────┬────────────────────────────────────────────────┐
596///     │ unnest(i, "recursive" := CAST('t' AS BOOLEAN)) │ unnest(j, "recursive" := CAST('t' AS BOOLEAN)) │
597///     │                     int32                      │                     int32                      │
598///     ├────────────────────────────────────────────────┼────────────────────────────────────────────────┤
599///     │                                              1 │                                              1 │
600///     │                                              2 │                                              2 │
601///     │                                              3 │                                              1 │
602///     │                                              4 │                                              2 │
603///     │                                              5 │                                              1 │
604///     │                                              6 │                                              2 │
605///     │                                              7 │                                              1 │
606///     │                                              8 │                                              2 │
607///     └────────────────────────────────────────────────┴────────────────────────────────────────────────┘
608/// ```
609///
610/// The following implementation refer to DuckDB's implementation
611fn build_batch(
612    batch: &RecordBatch,
613    schema: &SchemaRef,
614    list_type_columns: &[ListUnnest],
615    struct_column_indices: &HashSet<usize>,
616    options: &UnnestOptions,
617) -> Result<Option<RecordBatch>> {
618    let transformed = match list_type_columns.len() {
619        0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
620        _ => {
621            let mut temp_unnested_result = HashMap::new();
622            let max_recursion = list_type_columns
623                .iter()
624                .fold(0, |highest_depth, ListUnnest { depth, .. }| {
625                    cmp::max(highest_depth, *depth)
626                });
627
628            // This arr always has the same column count with the input batch
629            let mut flatten_arrs = vec![];
630
631            // Original batch has the same columns
632            // All unnesting results are written to temp_batch
633            for depth in (1..=max_recursion).rev() {
634                let input = match depth == max_recursion {
635                    true => batch.columns(),
636                    false => &flatten_arrs,
637                };
638                let Some(temp_result) = list_unnest_at_level(
639                    input,
640                    list_type_columns,
641                    &mut temp_unnested_result,
642                    depth,
643                    options,
644                )?
645                else {
646                    return Ok(None);
647                };
648                flatten_arrs = temp_result;
649            }
650            let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
651                temp_unnested_result.into_iter().fold(
652                    HashMap::new(),
653                    |mut acc,
654                     (
655                        ListUnnest {
656                            index_in_input_schema,
657                            depth,
658                        },
659                        flattened_array,
660                    )| {
661                        acc.entry(index_in_input_schema).or_default().push(
662                            UnnestingResult {
663                                arr: flattened_array,
664                                depth,
665                            },
666                        );
667                        acc
668                    },
669                );
670            let output_order: HashMap<ListUnnest, usize> = list_type_columns
671                .iter()
672                .enumerate()
673                .map(|(order, unnest_def)| (*unnest_def, order))
674                .collect();
675
676            // One original column may be unnested multiple times into separate columns
677            let mut multi_unnested_per_original_index = unnested_array_map
678                .into_iter()
679                .map(
680                    // Each item in unnested_columns is the result of unnesting the same input column
681                    // we need to sort them to conform with the original expression order
682                    // e.g unnest(unnest(col)) must goes before unnest(col)
683                    |(original_index, mut unnested_columns)| {
684                        unnested_columns.sort_by(
685                            |UnnestingResult { depth: depth1, .. },
686                             UnnestingResult { depth: depth2, .. }|
687                             -> Ordering {
688                                output_order
689                                    .get(&ListUnnest {
690                                        depth: *depth1,
691                                        index_in_input_schema: original_index,
692                                    })
693                                    .unwrap()
694                                    .cmp(
695                                        output_order
696                                            .get(&ListUnnest {
697                                                depth: *depth2,
698                                                index_in_input_schema: original_index,
699                                            })
700                                            .unwrap(),
701                                    )
702                            },
703                        );
704                        (
705                            original_index,
706                            unnested_columns
707                                .into_iter()
708                                .map(|result| result.arr)
709                                .collect::<Vec<_>>(),
710                        )
711                    },
712                )
713                .collect::<HashMap<_, _>>();
714
715            let ret = flatten_arrs
716                .into_iter()
717                .enumerate()
718                .flat_map(|(col_idx, arr)| {
719                    // Convert original column into its unnested version(s)
720                    // Plural because one column can be unnested with different recursion level
721                    // and into separate output columns
722                    match multi_unnested_per_original_index.remove(&col_idx) {
723                        Some(unnested_arrays) => unnested_arrays,
724                        None => vec![arr],
725                    }
726                })
727                .collect::<Vec<_>>();
728
729            flatten_struct_cols(&ret, schema, struct_column_indices)
730        }
731    }?;
732    Ok(Some(transformed))
733}
734
735/// Find the longest list length among the given list arrays for each row.
736///
737/// For example if we have the following two list arrays:
738///
739/// ```ignore
740/// l1: [1, 2, 3], null, [], [3]
741/// l2: [4,5], [], null, [6, 7]
742/// ```
743///
744/// If `preserve_nulls` is false, the longest length array will be:
745///
746/// ```ignore
747/// longest_length: [3, 0, 0, 2]
748/// ```
749///
750/// whereas if `preserve_nulls` is true, the longest length array will be:
751///
752///
753/// ```ignore
754/// longest_length: [3, 1, 1, 2]
755/// ```
756fn find_longest_length(
757    list_arrays: &[ArrayRef],
758    options: &UnnestOptions,
759) -> Result<ArrayRef> {
760    // The length of a NULL list
761    let null_length = if options.preserve_nulls {
762        Scalar::new(Int64Array::from_value(1, 1))
763    } else {
764        Scalar::new(Int64Array::from_value(0, 1))
765    };
766    let list_lengths: Vec<ArrayRef> = list_arrays
767        .iter()
768        .map(|list_array| {
769            let mut length_array = length(list_array)?;
770            // Make sure length arrays have the same type. Int64 is the most general one.
771            length_array = cast(&length_array, &DataType::Int64)?;
772            length_array =
773                zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
774            Ok(length_array)
775        })
776        .collect::<Result<_>>()?;
777
778    let longest_length = list_lengths.iter().skip(1).try_fold(
779        Arc::clone(&list_lengths[0]),
780        |longest, current| {
781            let is_lt = lt(&longest, &current)?;
782            zip(&is_lt, &current, &longest)
783        },
784    )?;
785    Ok(longest_length)
786}
787
788/// Trait defining common methods used for unnesting, implemented by list array types.
789trait ListArrayType: Array {
790    /// Returns a reference to the values of this list.
791    fn values(&self) -> &ArrayRef;
792
793    /// Returns the start and end offset of the values for the given row.
794    fn value_offsets(&self, row: usize) -> (i64, i64);
795}
796
797impl ListArrayType for ListArray {
798    fn values(&self) -> &ArrayRef {
799        self.values()
800    }
801
802    fn value_offsets(&self, row: usize) -> (i64, i64) {
803        let offsets = self.value_offsets();
804        (offsets[row].into(), offsets[row + 1].into())
805    }
806}
807
808impl ListArrayType for LargeListArray {
809    fn values(&self) -> &ArrayRef {
810        self.values()
811    }
812
813    fn value_offsets(&self, row: usize) -> (i64, i64) {
814        let offsets = self.value_offsets();
815        (offsets[row], offsets[row + 1])
816    }
817}
818
819impl ListArrayType for FixedSizeListArray {
820    fn values(&self) -> &ArrayRef {
821        self.values()
822    }
823
824    fn value_offsets(&self, row: usize) -> (i64, i64) {
825        let start = self.value_offset(row) as i64;
826        (start, start + self.value_length() as i64)
827    }
828}
829
830/// Unnest multiple list arrays according to the length array.
831fn unnest_list_arrays(
832    list_arrays: &[ArrayRef],
833    length_array: &PrimitiveArray<Int64Type>,
834    capacity: usize,
835) -> Result<Vec<ArrayRef>> {
836    let typed_arrays = list_arrays
837        .iter()
838        .map(|list_array| match list_array.data_type() {
839            DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
840            DataType::LargeList(_) => {
841                Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
842            }
843            DataType::FixedSizeList(_, _) => {
844                Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
845            }
846            other => exec_err!("Invalid unnest datatype {other }"),
847        })
848        .collect::<Result<Vec<_>>>()?;
849
850    typed_arrays
851        .iter()
852        .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
853        .collect::<Result<_>>()
854}
855
856/// Unnest a list array according the target length array.
857///
858/// Consider a list array like this:
859///
860/// ```ignore
861/// [1], [2, 3, 4], null, [5], [],
862/// ```
863///
864/// and the length array is:
865///
866/// ```ignore
867/// [2, 3, 2, 1, 2]
868/// ```
869///
870/// If the length of a certain list is less than the target length, pad with NULLs.
871/// So the unnested array will look like this:
872///
873/// ```ignore
874/// [1, null, 2, 3, 4, null, null, 5, null, null]
875/// ```
876fn unnest_list_array(
877    list_array: &dyn ListArrayType,
878    length_array: &PrimitiveArray<Int64Type>,
879    capacity: usize,
880) -> Result<ArrayRef> {
881    let values = list_array.values();
882    let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
883    for row in 0..list_array.len() {
884        let mut value_length = 0;
885        if !list_array.is_null(row) {
886            let (start, end) = list_array.value_offsets(row);
887            value_length = end - start;
888            for i in start..end {
889                take_indices_builder.append_value(i)
890            }
891        }
892        let target_length = length_array.value(row);
893        debug_assert!(
894            value_length <= target_length,
895            "value length is beyond the longest length"
896        );
897        // Pad with NULL values
898        for _ in value_length..target_length {
899            take_indices_builder.append_null();
900        }
901    }
902    Ok(kernels::take::take(
903        &values,
904        &take_indices_builder.finish(),
905        None,
906    )?)
907}
908
909/// Creates take indices that will be used to expand all columns except for the list type
910/// [`columns`](UnnestExec::list_column_indices) that is being unnested.
911/// Every column value needs to be repeated multiple times according to the length array.
912///
913/// If the length array looks like this:
914///
915/// ```ignore
916/// [2, 3, 1]
917/// ```
918/// Then [`create_take_indices`] will return an array like this
919///
920/// ```ignore
921/// [0, 0, 1, 1, 1, 2]
922/// ```
923fn create_take_indices(
924    length_array: &PrimitiveArray<Int64Type>,
925    capacity: usize,
926) -> PrimitiveArray<Int64Type> {
927    // `find_longest_length()` guarantees this.
928    debug_assert!(
929        length_array.null_count() == 0,
930        "length array should not contain nulls"
931    );
932    let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
933    for (index, repeat) in length_array.iter().enumerate() {
934        // The length array should not contain nulls, so unwrap is safe
935        let repeat = repeat.unwrap();
936        (0..repeat).for_each(|_| builder.append_value(index as i64));
937    }
938    builder.finish()
939}
940
941/// Create a batch of arrays based on an input `batch` and a `indices` array.
942/// The `indices` array is used by the take kernel to repeat values in the arrays
943/// that are marked with `true` in the `repeat_mask`. Arrays marked with `false`
944/// in the `repeat_mask` will be replaced with arrays filled with nulls of the
945/// appropriate length.
946///
947/// For example if we have the following batch:
948///
949/// ```ignore
950/// c1: [1], null, [2, 3, 4], null, [5, 6]
951/// c2: 'a', 'b',  'c', null, 'd'
952/// ```
953///
954/// then the `unnested_list_arrays` contains the unnest column that will replace `c1` in
955/// the final batch if `preserve_nulls` is true:
956///
957/// ```ignore
958/// c1: 1, null, 2, 3, 4, null, 5, 6
959/// ```
960///
961/// And the `indices` array contains the indices that are used by `take` kernel to
962/// repeat the values in `c2`:
963///
964/// ```ignore
965/// 0, 1, 2, 2, 2, 3, 4, 4
966/// ```
967///
968/// so that the final batch will look like:
969///
970/// ```ignore
971/// c1: 1, null, 2, 3, 4, null, 5, 6
972/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
973/// ```
974///
975/// The `repeat_mask` determines whether an array's values are repeated or replaced with nulls.
976/// For example, if the `repeat_mask` is:
977///
978/// ```ignore
979/// [true, false]
980/// ```
981///
982/// The final batch will look like:
983///
984/// ```ignore
985/// c1: 1, null, 2, 3, 4, null, 5, 6  // Repeated using `indices`
986/// c2: null, null, null, null, null, null, null, null  // Replaced with nulls
987fn repeat_arrs_from_indices(
988    batch: &[ArrayRef],
989    indices: &PrimitiveArray<Int64Type>,
990    repeat_mask: &[bool],
991) -> Result<Vec<Arc<dyn Array>>> {
992    batch
993        .iter()
994        .zip(repeat_mask.iter())
995        .map(|(arr, &repeat)| {
996            if repeat {
997                Ok(kernels::take::take(arr, indices, None)?)
998            } else {
999                Ok(new_null_array(arr.data_type(), arr.len()))
1000            }
1001        })
1002        .collect()
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use super::*;
1008    use arrow::array::{
1009        GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
1010    };
1011    use arrow::buffer::{NullBuffer, OffsetBuffer};
1012    use arrow::datatypes::{Field, Int32Type};
1013    use datafusion_common::test_util::batches_to_string;
1014    use insta::assert_snapshot;
1015
1016    // Create a GenericListArray with the following list values:
1017    //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1018    fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
1019    where
1020        OffsetSize: OffsetSizeTrait,
1021    {
1022        let mut values = vec![];
1023        let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
1024        let mut valid = NullBufferBuilder::new(6);
1025
1026        // [A, B, C]
1027        values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
1028        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1029        valid.append_non_null();
1030
1031        // []
1032        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1033        valid.append_non_null();
1034
1035        // NULL with non-zero value length
1036        // Issue https://github.com/apache/datafusion/issues/9932
1037        values.push(Some("?"));
1038        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1039        valid.append_null();
1040
1041        // [D]
1042        values.push(Some("D"));
1043        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1044        valid.append_non_null();
1045
1046        // Another NULL with zero value length
1047        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1048        valid.append_null();
1049
1050        // [NULL, F]
1051        values.extend_from_slice(&[None, Some("F")]);
1052        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1053        valid.append_non_null();
1054
1055        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1056        GenericListArray::<OffsetSize>::new(
1057            field,
1058            OffsetBuffer::new(offsets.into()),
1059            Arc::new(StringArray::from(values)),
1060            valid.finish(),
1061        )
1062    }
1063
1064    // Create a FixedSizeListArray with the following list values:
1065    //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1066    fn make_fixed_list() -> FixedSizeListArray {
1067        let values = Arc::new(StringArray::from_iter([
1068            Some("A"),
1069            Some("B"),
1070            None,
1071            None,
1072            Some("C"),
1073            Some("D"),
1074            None,
1075            None,
1076            None,
1077            Some("F"),
1078            None,
1079            None,
1080        ]));
1081        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1082        let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1083        FixedSizeListArray::new(field, 2, values, Some(valid))
1084    }
1085
1086    fn verify_unnest_list_array(
1087        list_array: &dyn ListArrayType,
1088        lengths: Vec<i64>,
1089        expected: Vec<Option<&str>>,
1090    ) -> Result<()> {
1091        let length_array = Int64Array::from(lengths);
1092        let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1093        let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1094        assert_eq!(strs, expected);
1095        Ok(())
1096    }
1097
1098    #[test]
1099    fn test_build_batch_list_arr_recursive() -> Result<()> {
1100        // col1                             | col2
1101        // [[1,2,3],null,[4,5]]             | ['a','b']
1102        // [[7,8,9,10], null, [11,12,13]]   | ['c','d']
1103        // null                             | ['e']
1104        let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1105            Some(vec![Some(1), Some(2), Some(3)]),
1106            None,
1107            Some(vec![Some(4), Some(5)]),
1108            Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1109            None,
1110            Some(vec![Some(11), Some(12), Some(13)]),
1111        ]);
1112
1113        let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1114        let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1115        let mut nulls = NullBufferBuilder::new(3);
1116        nulls.append_non_null();
1117        nulls.append_non_null();
1118        nulls.append_null();
1119        // list<list<int32>>
1120        let col1_field = Field::new_list_field(
1121            DataType::List(Arc::new(Field::new_list_field(
1122                list_arr1_ref.data_type().to_owned(),
1123                true,
1124            ))),
1125            true,
1126        );
1127        let col1 = ListArray::new(
1128            Arc::new(Field::new_list_field(
1129                list_arr1_ref.data_type().to_owned(),
1130                true,
1131            )),
1132            offsets,
1133            list_arr1_ref,
1134            nulls.finish(),
1135        );
1136
1137        let list_arr2 = StringArray::from(vec![
1138            Some("a"),
1139            Some("b"),
1140            Some("c"),
1141            Some("d"),
1142            Some("e"),
1143        ]);
1144
1145        let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1146        let mut nulls = NullBufferBuilder::new(3);
1147        nulls.append_n_non_nulls(3);
1148        let col2_field = Field::new(
1149            "col2",
1150            DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1151            true,
1152        );
1153        let col2 = GenericListArray::<i32>::new(
1154            Arc::new(Field::new_list_field(DataType::Utf8, true)),
1155            OffsetBuffer::new(offsets.into()),
1156            Arc::new(list_arr2),
1157            nulls.finish(),
1158        );
1159        // convert col1 and col2 to a record batch
1160        let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1161        let out_schema = Arc::new(Schema::new(vec![
1162            Field::new(
1163                "col1_unnest_placeholder_depth_1",
1164                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1165                true,
1166            ),
1167            Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1168            Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1169        ]));
1170        let batch = RecordBatch::try_new(
1171            Arc::clone(&schema),
1172            vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1173        )
1174        .unwrap();
1175        let list_type_columns = vec![
1176            ListUnnest {
1177                index_in_input_schema: 0,
1178                depth: 1,
1179            },
1180            ListUnnest {
1181                index_in_input_schema: 0,
1182                depth: 2,
1183            },
1184            ListUnnest {
1185                index_in_input_schema: 1,
1186                depth: 1,
1187            },
1188        ];
1189        let ret = build_batch(
1190            &batch,
1191            &out_schema,
1192            list_type_columns.as_ref(),
1193            &HashSet::default(),
1194            &UnnestOptions {
1195                preserve_nulls: true,
1196                recursions: vec![],
1197            },
1198        )?
1199        .unwrap();
1200
1201        assert_snapshot!(batches_to_string(&[ret]),
1202        @r"
1203        +---------------------------------+---------------------------------+---------------------------------+
1204        | col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |
1205        +---------------------------------+---------------------------------+---------------------------------+
1206        | [1, 2, 3]                       | 1                               | a                               |
1207        |                                 | 2                               | b                               |
1208        | [4, 5]                          | 3                               |                                 |
1209        | [1, 2, 3]                       |                                 | a                               |
1210        |                                 |                                 | b                               |
1211        | [4, 5]                          |                                 |                                 |
1212        | [1, 2, 3]                       | 4                               | a                               |
1213        |                                 | 5                               | b                               |
1214        | [4, 5]                          |                                 |                                 |
1215        | [7, 8, 9, 10]                   | 7                               | c                               |
1216        |                                 | 8                               | d                               |
1217        | [11, 12, 13]                    | 9                               |                                 |
1218        |                                 | 10                              |                                 |
1219        | [7, 8, 9, 10]                   |                                 | c                               |
1220        |                                 |                                 | d                               |
1221        | [11, 12, 13]                    |                                 |                                 |
1222        | [7, 8, 9, 10]                   | 11                              | c                               |
1223        |                                 | 12                              | d                               |
1224        | [11, 12, 13]                    | 13                              |                                 |
1225        |                                 |                                 | e                               |
1226        +---------------------------------+---------------------------------+---------------------------------+
1227        ");
1228        Ok(())
1229    }
1230
1231    #[test]
1232    fn test_unnest_list_array() -> Result<()> {
1233        // [A, B, C], [], NULL, [D], NULL, [NULL, F]
1234        let list_array = make_generic_array::<i32>();
1235        verify_unnest_list_array(
1236            &list_array,
1237            vec![3, 2, 1, 2, 0, 3],
1238            vec![
1239                Some("A"),
1240                Some("B"),
1241                Some("C"),
1242                None,
1243                None,
1244                None,
1245                Some("D"),
1246                None,
1247                None,
1248                Some("F"),
1249                None,
1250            ],
1251        )?;
1252
1253        // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1254        let list_array = make_fixed_list();
1255        verify_unnest_list_array(
1256            &list_array,
1257            vec![3, 1, 2, 0, 2, 3],
1258            vec![
1259                Some("A"),
1260                Some("B"),
1261                None,
1262                None,
1263                Some("C"),
1264                Some("D"),
1265                None,
1266                Some("F"),
1267                None,
1268                None,
1269                None,
1270            ],
1271        )?;
1272
1273        Ok(())
1274    }
1275
1276    fn verify_longest_length(
1277        list_arrays: &[ArrayRef],
1278        preserve_nulls: bool,
1279        expected: Vec<i64>,
1280    ) -> Result<()> {
1281        let options = UnnestOptions {
1282            preserve_nulls,
1283            recursions: vec![],
1284        };
1285        let longest_length = find_longest_length(list_arrays, &options)?;
1286        let expected_array = Int64Array::from(expected);
1287        assert_eq!(
1288            longest_length
1289                .as_any()
1290                .downcast_ref::<Int64Array>()
1291                .unwrap(),
1292            &expected_array
1293        );
1294        Ok(())
1295    }
1296
1297    #[test]
1298    fn test_longest_list_length() -> Result<()> {
1299        // Test with single ListArray
1300        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1301        let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1302        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1303        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1304
1305        // Test with single LargeListArray
1306        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1307        let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1308        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1309        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1310
1311        // Test with single FixedSizeListArray
1312        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1313        let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1314        verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1315        verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1316
1317        // Test with multiple list arrays
1318        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1319        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1320        let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1321        let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1322        let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1323        verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1324        verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1325
1326        Ok(())
1327    }
1328
1329    #[test]
1330    fn test_create_take_indices() -> Result<()> {
1331        let length_array = Int64Array::from(vec![2, 3, 1]);
1332        let take_indices = create_take_indices(&length_array, 6);
1333        let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1334        assert_eq!(take_indices, expected);
1335        Ok(())
1336    }
1337}