Skip to main content

datafusion_physical_plan/
unnest.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define a plan for unnesting values in columns that contain a list type.
19
20use std::cmp::{self, Ordering};
21use std::sync::Arc;
22use std::task::{Poll, ready};
23
24use super::metrics::{
25    self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory,
26    MetricsSet, RecordOutput,
27};
28use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
29use crate::stream::EmptyRecordBatchStream;
30use crate::{
31    DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
32    SendableRecordBatchStream, check_if_same_properties,
33};
34
35use arrow::array::{
36    Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray, Int64Array,
37    LargeListArray, LargeListViewArray, ListArray, ListViewArray, PrimitiveArray, Scalar,
38    StructArray, new_null_array,
39};
40use arrow::compute::kernels::length::length;
41use arrow::compute::kernels::zip::zip;
42use arrow::compute::{cast, is_not_null, kernels, sum};
43use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
44use arrow::record_batch::RecordBatch;
45use arrow_ord::cmp::lt;
46use async_trait::async_trait;
47use datafusion_common::{
48    Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err,
49    internal_err,
50};
51use datafusion_execution::TaskContext;
52use datafusion_physical_expr::PhysicalExpr;
53use datafusion_physical_expr::equivalence::ProjectionMapping;
54use datafusion_physical_expr::expressions::Column;
55use futures::{Stream, StreamExt};
56use log::trace;
57
58/// Unnest the given columns (either with type struct or list)
59/// For list unnesting, each row is vertically transformed into multiple rows
60/// For struct unnesting, each column is horizontally transformed into multiple columns,
61/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m')
62///
63/// See [`UnnestOptions`] for more details and an example.
64#[derive(Debug, Clone)]
65pub struct UnnestExec {
66    /// Input execution plan
67    input: Arc<dyn ExecutionPlan>,
68    /// The schema once the unnest is applied
69    schema: SchemaRef,
70    /// Indices of the list-typed columns in the input schema
71    list_column_indices: Vec<ListUnnest>,
72    /// Indices of the struct-typed columns in the input schema
73    struct_column_indices: Vec<usize>,
74    /// Options
75    options: UnnestOptions,
76    /// Execution metrics
77    metrics: ExecutionPlanMetricsSet,
78    /// Cache holding plan properties like equivalences, output partitioning etc.
79    cache: Arc<PlanProperties>,
80}
81
82impl UnnestExec {
83    /// Create a new [UnnestExec].
84    pub fn new(
85        input: Arc<dyn ExecutionPlan>,
86        list_column_indices: Vec<ListUnnest>,
87        struct_column_indices: Vec<usize>,
88        schema: SchemaRef,
89        options: UnnestOptions,
90    ) -> Result<Self> {
91        let cache = Self::compute_properties(
92            &input,
93            &list_column_indices,
94            &struct_column_indices,
95            &schema,
96        )?;
97
98        Ok(UnnestExec {
99            input,
100            schema,
101            list_column_indices,
102            struct_column_indices,
103            options,
104            metrics: Default::default(),
105            cache: Arc::new(cache),
106        })
107    }
108
109    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
110    fn compute_properties(
111        input: &Arc<dyn ExecutionPlan>,
112        list_column_indices: &[ListUnnest],
113        struct_column_indices: &[usize],
114        schema: &SchemaRef,
115    ) -> Result<PlanProperties> {
116        // Find out which indices are not unnested, such that they can be copied over from the input plan
117        let input_schema = input.schema();
118        let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len());
119        unnested_indices.append_n(input_schema.fields().len(), false);
120        for list_unnest in list_column_indices {
121            unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
122        }
123        for struct_unnest in struct_column_indices {
124            unnested_indices.set_bit(*struct_unnest, true)
125        }
126        let unnested_indices = unnested_indices.finish();
127        let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
128            .filter(|idx| !unnested_indices.value(*idx))
129            .collect();
130
131        // Manually build projection mapping from non-unnested input columns to their positions in the output
132        let input_schema = input.schema();
133        let projection_mapping: ProjectionMapping = non_unnested_indices
134            .iter()
135            .map(|&input_idx| {
136                // Find what index the input column has in the output schema
137                let input_field = input_schema.field(input_idx);
138                let output_idx = schema
139                    .fields()
140                    .iter()
141                    .position(|output_field| output_field.name() == input_field.name())
142                    .ok_or_else(|| {
143                        exec_datafusion_err!(
144                            "Non-unnested column '{}' must exist in output schema",
145                            input_field.name()
146                        )
147                    })?;
148
149                let input_col = Arc::new(Column::new(input_field.name(), input_idx))
150                    as Arc<dyn PhysicalExpr>;
151                let target_col = Arc::new(Column::new(input_field.name(), output_idx))
152                    as Arc<dyn PhysicalExpr>;
153                // Use From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets
154                let targets = vec![(target_col, output_idx)].into();
155                Ok((input_col, targets))
156            })
157            .collect::<Result<ProjectionMapping>>()?;
158
159        // Create the unnest's equivalence properties by copying the input plan's equivalence properties
160        // for the unaffected columns. Except for the constraints, which are removed entirely because
161        // the unnest operation invalidates any global uniqueness or primary-key constraints.
162        let input_eq_properties = input.equivalence_properties();
163        let eq_properties = input_eq_properties
164            .project(&projection_mapping, Arc::clone(schema))
165            .with_constraints(Constraints::default());
166
167        // Output partitioning must use the projection mapping
168        let output_partitioning = input
169            .output_partitioning()
170            .project(&projection_mapping, &eq_properties);
171
172        Ok(PlanProperties::new(
173            eq_properties,
174            output_partitioning,
175            input.pipeline_behavior(),
176            input.boundedness(),
177        ))
178    }
179
180    /// Input execution plan
181    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
182        &self.input
183    }
184
185    /// Indices of the list-typed columns in the input schema
186    pub fn list_column_indices(&self) -> &[ListUnnest] {
187        &self.list_column_indices
188    }
189
190    /// Indices of the struct-typed columns in the input schema
191    pub fn struct_column_indices(&self) -> &[usize] {
192        &self.struct_column_indices
193    }
194
195    pub fn options(&self) -> &UnnestOptions {
196        &self.options
197    }
198
199    fn with_new_children_and_same_properties(
200        &self,
201        mut children: Vec<Arc<dyn ExecutionPlan>>,
202    ) -> Self {
203        Self {
204            input: children.swap_remove(0),
205            metrics: ExecutionPlanMetricsSet::new(),
206            ..Self::clone(self)
207        }
208    }
209}
210
211impl DisplayAs for UnnestExec {
212    fn fmt_as(
213        &self,
214        t: DisplayFormatType,
215        f: &mut std::fmt::Formatter,
216    ) -> std::fmt::Result {
217        match t {
218            DisplayFormatType::Default | DisplayFormatType::Verbose => {
219                write!(f, "UnnestExec")
220            }
221            DisplayFormatType::TreeRender => {
222                write!(f, "")
223            }
224        }
225    }
226}
227
228impl ExecutionPlan for UnnestExec {
229    fn name(&self) -> &'static str {
230        "UnnestExec"
231    }
232
233    fn properties(&self) -> &Arc<PlanProperties> {
234        &self.cache
235    }
236
237    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
238        vec![&self.input]
239    }
240
241    fn with_new_children(
242        self: Arc<Self>,
243        mut children: Vec<Arc<dyn ExecutionPlan>>,
244    ) -> Result<Arc<dyn ExecutionPlan>> {
245        check_if_same_properties!(self, children);
246        Ok(Arc::new(UnnestExec::new(
247            children.swap_remove(0),
248            self.list_column_indices.clone(),
249            self.struct_column_indices.clone(),
250            Arc::clone(&self.schema),
251            self.options.clone(),
252        )?))
253    }
254
255    fn required_input_distribution(&self) -> Vec<Distribution> {
256        vec![Distribution::UnspecifiedDistribution]
257    }
258
259    fn execute(
260        &self,
261        partition: usize,
262        context: Arc<TaskContext>,
263    ) -> Result<SendableRecordBatchStream> {
264        let input = self.input.execute(partition, context)?;
265        let metrics = UnnestMetrics::new(partition, &self.metrics);
266
267        Ok(Box::pin(UnnestStream {
268            input,
269            schema: Arc::clone(&self.schema),
270            list_type_columns: self.list_column_indices.clone(),
271            struct_column_indices: self.struct_column_indices.iter().copied().collect(),
272            options: self.options.clone(),
273            metrics,
274        }))
275    }
276
277    fn metrics(&self) -> Option<MetricsSet> {
278        Some(self.metrics.clone_inner())
279    }
280}
281
282#[derive(Clone, Debug)]
283struct UnnestMetrics {
284    /// Execution metrics
285    baseline_metrics: BaselineMetrics,
286    /// Number of batches consumed
287    input_batches: metrics::Count,
288    /// Number of rows consumed
289    input_rows: metrics::Count,
290}
291
292impl UnnestMetrics {
293    fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
294        let input_batches = MetricBuilder::new(metrics)
295            .with_category(MetricCategory::Rows)
296            .counter("input_batches", partition);
297
298        let input_rows = MetricBuilder::new(metrics)
299            .with_category(MetricCategory::Rows)
300            .counter("input_rows", partition);
301
302        Self {
303            baseline_metrics: BaselineMetrics::new(metrics, partition),
304            input_batches,
305            input_rows,
306        }
307    }
308}
309
310/// A stream that issues [RecordBatch]es with unnested column data.
311struct UnnestStream {
312    /// Input stream
313    input: SendableRecordBatchStream,
314    /// Unnested schema
315    schema: Arc<Schema>,
316    /// represents all unnest operations to be applied to the input (input index, depth)
317    /// e.g unnest(col1),unnest(unnest(col1)) where col1 has index 1 in original input schema
318    /// then list_type_columns = [ListUnnest{1,1},ListUnnest{1,2}]
319    list_type_columns: Vec<ListUnnest>,
320    struct_column_indices: HashSet<usize>,
321    /// Options
322    options: UnnestOptions,
323    /// Metrics
324    metrics: UnnestMetrics,
325}
326
327impl RecordBatchStream for UnnestStream {
328    fn schema(&self) -> SchemaRef {
329        Arc::clone(&self.schema)
330    }
331}
332
333#[async_trait]
334impl Stream for UnnestStream {
335    type Item = Result<RecordBatch>;
336
337    fn poll_next(
338        mut self: std::pin::Pin<&mut Self>,
339        cx: &mut std::task::Context<'_>,
340    ) -> Poll<Option<Self::Item>> {
341        self.poll_next_impl(cx)
342    }
343}
344
345impl UnnestStream {
346    /// Separate implementation function that unpins the [`UnnestStream`] so
347    /// that partial borrows work correctly
348    fn poll_next_impl(
349        &mut self,
350        cx: &mut std::task::Context<'_>,
351    ) -> Poll<Option<Result<RecordBatch>>> {
352        loop {
353            return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
354                Some(Ok(batch)) => {
355                    let elapsed_compute =
356                        self.metrics.baseline_metrics.elapsed_compute().clone();
357                    let timer = elapsed_compute.timer();
358                    self.metrics.input_batches.add(1);
359                    self.metrics.input_rows.add(batch.num_rows());
360                    let result = build_batch(
361                        &batch,
362                        &self.schema,
363                        &self.list_type_columns,
364                        &self.struct_column_indices,
365                        &self.options,
366                    )?;
367                    timer.done();
368                    let Some(result_batch) = result else {
369                        continue;
370                    };
371                    (&result_batch).record_output(&self.metrics.baseline_metrics);
372
373                    // Empty record batches should not be emitted.
374                    // They need to be treated as  [`Option<RecordBatch>`]es and handled separately
375                    debug_assert!(result_batch.num_rows() > 0);
376                    Some(Ok(result_batch))
377                }
378                // If the stream is depleted or returned an error, log the finish message:
379                other => {
380                    trace!(
381                        "Processed {} probe-side input batches containing {} rows and \
382                        produced {} output batches containing {} rows in {}",
383                        self.metrics.input_batches,
384                        self.metrics.input_rows,
385                        self.metrics.baseline_metrics.output_batches(),
386                        self.metrics.baseline_metrics.output_rows(),
387                        self.metrics.baseline_metrics.elapsed_compute(),
388                    );
389
390                    // In the non-error case, i.e., input is simply depleted:
391                    if other.is_none() {
392                        // Release the input pipeline's resources.
393                        let input_schema = self.input.schema();
394                        self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
395                    }
396
397                    other
398                }
399            });
400        }
401    }
402}
403
404/// Given a set of struct column indices to flatten
405/// try converting the column in input into multiple subfield columns
406/// For example
407/// struct_col: [a: struct(item: int, name: string), b: int]
408/// with a batch
409/// {a: {item: 1, name: "a"}, b: 2},
410/// {a: {item: 3, name: "b"}, b: 4]
411/// will be converted into
412/// {a.item: 1, a.name: "a", b: 2},
413/// {a.item: 3, a.name: "b", b: 4}
414fn flatten_struct_cols(
415    input_batch: &[Arc<dyn Array>],
416    schema: &SchemaRef,
417    struct_column_indices: &HashSet<usize>,
418) -> Result<RecordBatch> {
419    // horizontal expansion because of struct unnest
420    let columns_expanded = input_batch
421        .iter()
422        .enumerate()
423        .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
424            Some(_) => match column_data.data_type() {
425                DataType::Struct(_) => {
426                    let struct_arr =
427                        column_data.as_any().downcast_ref::<StructArray>().unwrap();
428                    Ok(struct_arr.columns().to_vec())
429                }
430                data_type => internal_err!(
431                    "expecting column {idx} from input plan to be a struct, got {data_type}"
432                ),
433            },
434            None => Ok(vec![Arc::clone(column_data)]),
435        })
436        .collect::<Result<Vec<_>>>()?
437        .into_iter()
438        .flatten()
439        .collect();
440    Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
441}
442
443#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
444pub struct ListUnnest {
445    pub index_in_input_schema: usize,
446    pub depth: usize,
447}
448
449/// This function is used to execute the unnesting on multiple columns all at once, but
450/// one level at a time, and is called n times, where n is the highest recursion level among
451/// the unnest exprs in the query.
452///
453/// For example giving the following query:
454/// ```sql
455/// select unnest(colA, max_depth:=3) as P1, unnest(colA,max_depth:=2) as P2, unnest(colB, max_depth:=1) as P3 from temp;
456/// ```
457/// Then the total times this function being called is 3
458///
459/// It needs to be aware of which level the current unnesting is, because if there exists
460/// multiple unnesting on the same column, but with different recursion levels, say
461/// **unnest(colA, max_depth:=3)** and **unnest(colA, max_depth:=2)**, then the unnesting
462/// of expr **unnest(colA, max_depth:=3)** will start at level 3, while unnesting for expr
463/// **unnest(colA, max_depth:=2)** has to start at level 2
464///
465/// Set *colA* as a 3-dimension columns and *colB* as an array (1-dimension). As stated,
466/// this function is called with the descending order of recursion depth
467///
468/// Depth = 3
469/// - colA(3-dimension) unnest into temp column temp_P1(2_dimension) (unnesting of P1 starts
470///   from this level)
471/// - colA(3-dimension) having indices repeated by the unnesting operation above
472/// - colB(1-dimension) having indices repeated by the unnesting operation above
473///
474/// Depth = 2
475/// - temp_P1(2-dimension) unnest into temp column temp_P1(1-dimension)
476/// - colA(3-dimension) unnest into temp column temp_P2(2-dimension) (unnesting of P2 starts
477///   from this level)
478/// - colB(1-dimension) having indices repeated by the unnesting operation above
479///
480/// Depth = 1
481/// - temp_P1(1-dimension) unnest into P1
482/// - temp_P2(2-dimension) unnest into P2
483/// - colB(1-dimension) unnest into P3 (unnesting of P3 starts from this level)
484///
485/// The returned array will has the same size as the input batch
486/// and only contains original columns that are not being unnested.
487fn list_unnest_at_level(
488    batch: &[ArrayRef],
489    list_type_unnests: &[ListUnnest],
490    temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
491    level_to_unnest: usize,
492    options: &UnnestOptions,
493) -> Result<Option<Vec<ArrayRef>>> {
494    // Extract unnestable columns at this level
495    let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
496        list_type_unnests
497            .iter()
498            .filter_map(|unnesting| {
499                if level_to_unnest == unnesting.depth {
500                    return Some((
501                        Arc::clone(&batch[unnesting.index_in_input_schema]),
502                        *unnesting,
503                    ));
504                }
505                // This means the unnesting on this item has started at higher level
506                // and need to continue until depth reaches 1
507                if level_to_unnest < unnesting.depth {
508                    return Some((
509                        Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
510                        *unnesting,
511                    ));
512                }
513                None
514            })
515            .unzip();
516
517    // Filter out so that list_arrays only contain column with the highest depth
518    // at the same time, during iteration remove this depth so next time we don't have to unnest them again
519    let longest_length = find_longest_length(&arrs_to_unnest, options)?;
520    let unnested_length = longest_length.as_primitive::<Int64Type>();
521    let total_length = if unnested_length.is_empty() {
522        0
523    } else {
524        sum(unnested_length).ok_or_else(|| {
525            exec_datafusion_err!("Failed to calculate the total unnested length")
526        })? as usize
527    };
528    if total_length == 0 {
529        return Ok(None);
530    }
531
532    // Unnest all the list arrays
533    let unnested_temp_arrays =
534        unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
535
536    // Create the take indices array for other columns
537    let take_indices = create_take_indices(unnested_length, total_length);
538    unnested_temp_arrays
539        .into_iter()
540        .zip(list_unnest_specs.iter())
541        .for_each(|(flatten_arr, unnesting)| {
542            temp_unnested_arrs.insert(*unnesting, flatten_arr);
543        });
544
545    let repeat_mask: Vec<bool> = batch
546        .iter()
547        .enumerate()
548        .map(|(i, _)| {
549            // Check if the column is needed in future levels (levels below the current one)
550            let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
551                unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
552            });
553
554            // Check if the column is involved in unnesting at any level
555            let is_involved_in_unnesting = list_type_unnests
556                .iter()
557                .any(|unnesting| unnesting.index_in_input_schema == i);
558
559            // Repeat columns needed in future levels or not unnested.
560            needed_in_future_levels || !is_involved_in_unnesting
561        })
562        .collect();
563
564    // Dimension of arrays in batch is untouched, but the values are repeated
565    // as the side effect of unnesting
566    let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
567
568    Ok(Some(ret))
569}
570struct UnnestingResult {
571    arr: ArrayRef,
572    depth: usize,
573}
574
575/// For each row in a `RecordBatch`, some list/struct columns need to be unnested.
576/// - For list columns: We will expand the values in each list into multiple rows,
577///   taking the longest length among these lists, and shorter lists are padded with NULLs.
578/// - For struct columns: We will expand the struct columns into multiple subfield columns.
579///
580/// For columns that don't need to be unnested, repeat their values until reaching the longest length.
581///
582/// Note: unnest has a big difference in behavior between Postgres and DuckDB
583///
584/// Take this example
585///
586/// 1. Postgres
587/// ```ignored
588/// create table temp (
589///     i integer[][][], j integer[]
590/// )
591/// insert into temp values ('{{{1,2},{3,4}},{{5,6},{7,8}}}', '{1,2}');
592/// select unnest(i), unnest(j) from temp;
593/// ```
594///
595/// Result
596/// ```text
597///     1   1
598///     2   2
599///     3
600///     4
601///     5
602///     6
603///     7
604///     8
605/// ```
606/// 2. DuckDB
607/// ```ignore
608///     create table temp (i integer[][][], j integer[]);
609///     insert into temp values ([[[1,2],[3,4]],[[5,6],[7,8]]], [1,2]);
610///     select unnest(i,recursive:=true), unnest(j,recursive:=true) from temp;
611/// ```
612/// Result:
613/// ```text
614///
615///     ┌────────────────────────────────────────────────┬────────────────────────────────────────────────┐
616///     │ unnest(i, "recursive" := CAST('t' AS BOOLEAN)) │ unnest(j, "recursive" := CAST('t' AS BOOLEAN)) │
617///     │                     int32                      │                     int32                      │
618///     ├────────────────────────────────────────────────┼────────────────────────────────────────────────┤
619///     │                                              1 │                                              1 │
620///     │                                              2 │                                              2 │
621///     │                                              3 │                                              1 │
622///     │                                              4 │                                              2 │
623///     │                                              5 │                                              1 │
624///     │                                              6 │                                              2 │
625///     │                                              7 │                                              1 │
626///     │                                              8 │                                              2 │
627///     └────────────────────────────────────────────────┴────────────────────────────────────────────────┘
628/// ```
629///
630/// The following implementation refer to DuckDB's implementation
631fn build_batch(
632    batch: &RecordBatch,
633    schema: &SchemaRef,
634    list_type_columns: &[ListUnnest],
635    struct_column_indices: &HashSet<usize>,
636    options: &UnnestOptions,
637) -> Result<Option<RecordBatch>> {
638    let transformed = match list_type_columns.len() {
639        0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
640        _ => {
641            let mut temp_unnested_result = HashMap::new();
642            let max_recursion = list_type_columns
643                .iter()
644                .fold(0, |highest_depth, ListUnnest { depth, .. }| {
645                    cmp::max(highest_depth, *depth)
646                });
647
648            // This arr always has the same column count with the input batch
649            let mut flatten_arrs = vec![];
650
651            // Original batch has the same columns
652            // All unnesting results are written to temp_batch
653            for depth in (1..=max_recursion).rev() {
654                let input = match depth == max_recursion {
655                    true => batch.columns(),
656                    false => &flatten_arrs,
657                };
658                let Some(temp_result) = list_unnest_at_level(
659                    input,
660                    list_type_columns,
661                    &mut temp_unnested_result,
662                    depth,
663                    options,
664                )?
665                else {
666                    return Ok(None);
667                };
668                flatten_arrs = temp_result;
669            }
670            let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
671                temp_unnested_result.into_iter().fold(
672                    HashMap::new(),
673                    |mut acc,
674                     (
675                        ListUnnest {
676                            index_in_input_schema,
677                            depth,
678                        },
679                        flattened_array,
680                    )| {
681                        acc.entry(index_in_input_schema).or_default().push(
682                            UnnestingResult {
683                                arr: flattened_array,
684                                depth,
685                            },
686                        );
687                        acc
688                    },
689                );
690            let output_order: HashMap<ListUnnest, usize> = list_type_columns
691                .iter()
692                .enumerate()
693                .map(|(order, unnest_def)| (*unnest_def, order))
694                .collect();
695
696            // One original column may be unnested multiple times into separate columns
697            let mut multi_unnested_per_original_index = unnested_array_map
698                .into_iter()
699                .map(
700                    // Each item in unnested_columns is the result of unnesting the same input column
701                    // we need to sort them to conform with the original expression order
702                    // e.g unnest(unnest(col)) must goes before unnest(col)
703                    |(original_index, mut unnested_columns)| {
704                        unnested_columns.sort_by(
705                            |UnnestingResult { depth: depth1, .. },
706                             UnnestingResult { depth: depth2, .. }|
707                             -> Ordering {
708                                output_order
709                                    .get(&ListUnnest {
710                                        depth: *depth1,
711                                        index_in_input_schema: original_index,
712                                    })
713                                    .unwrap()
714                                    .cmp(
715                                        output_order
716                                            .get(&ListUnnest {
717                                                depth: *depth2,
718                                                index_in_input_schema: original_index,
719                                            })
720                                            .unwrap(),
721                                    )
722                            },
723                        );
724                        (
725                            original_index,
726                            unnested_columns
727                                .into_iter()
728                                .map(|result| result.arr)
729                                .collect::<Vec<_>>(),
730                        )
731                    },
732                )
733                .collect::<HashMap<_, _>>();
734
735            let ret = flatten_arrs
736                .into_iter()
737                .enumerate()
738                .flat_map(|(col_idx, arr)| {
739                    // Convert original column into its unnested version(s)
740                    // Plural because one column can be unnested with different recursion level
741                    // and into separate output columns
742                    match multi_unnested_per_original_index.remove(&col_idx) {
743                        Some(unnested_arrays) => unnested_arrays,
744                        None => vec![arr],
745                    }
746                })
747                .collect::<Vec<_>>();
748
749            flatten_struct_cols(&ret, schema, struct_column_indices)
750        }
751    }?;
752    Ok(Some(transformed))
753}
754
755/// Find the longest list length among the given list arrays for each row.
756///
757/// For example if we have the following two list arrays:
758///
759/// ```ignore
760/// l1: [1, 2, 3], null, [], [3]
761/// l2: [4,5], [], null, [6, 7]
762/// ```
763///
764/// If `preserve_nulls` is false, the longest length array will be:
765///
766/// ```ignore
767/// longest_length: [3, 0, 0, 2]
768/// ```
769///
770/// whereas if `preserve_nulls` is true, the longest length array will be:
771///
772///
773/// ```ignore
774/// longest_length: [3, 1, 1, 2]
775/// ```
776fn find_longest_length(
777    list_arrays: &[ArrayRef],
778    options: &UnnestOptions,
779) -> Result<ArrayRef> {
780    // The length of a NULL list
781    let null_length = if options.preserve_nulls {
782        Scalar::new(Int64Array::from_value(1, 1))
783    } else {
784        Scalar::new(Int64Array::from_value(0, 1))
785    };
786    let list_lengths: Vec<ArrayRef> = list_arrays
787        .iter()
788        .map(|list_array| {
789            let mut length_array = length(list_array)?;
790            // Make sure length arrays have the same type. Int64 is the most general one.
791            length_array = cast(&length_array, &DataType::Int64)?;
792            length_array =
793                zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
794            Ok(length_array)
795        })
796        .collect::<Result<_>>()?;
797
798    let longest_length = list_lengths.iter().skip(1).try_fold(
799        Arc::clone(&list_lengths[0]),
800        |longest, current| {
801            let is_lt = lt(&longest, &current)?;
802            zip(&is_lt, &current, &longest)
803        },
804    )?;
805    Ok(longest_length)
806}
807
808/// Trait defining common methods used for unnesting, implemented by list array types.
809trait ListArrayType: Array {
810    /// Returns a reference to the values of this list.
811    fn values(&self) -> &ArrayRef;
812
813    /// Returns the start and end offset of the values for the given row.
814    fn value_offsets(&self, row: usize) -> (i64, i64);
815}
816
817impl ListArrayType for ListArray {
818    fn values(&self) -> &ArrayRef {
819        self.values()
820    }
821
822    fn value_offsets(&self, row: usize) -> (i64, i64) {
823        let offsets = self.value_offsets();
824        (offsets[row].into(), offsets[row + 1].into())
825    }
826}
827
828impl ListArrayType for LargeListArray {
829    fn values(&self) -> &ArrayRef {
830        self.values()
831    }
832
833    fn value_offsets(&self, row: usize) -> (i64, i64) {
834        let offsets = self.value_offsets();
835        (offsets[row], offsets[row + 1])
836    }
837}
838
839impl ListArrayType for FixedSizeListArray {
840    fn values(&self) -> &ArrayRef {
841        self.values()
842    }
843
844    fn value_offsets(&self, row: usize) -> (i64, i64) {
845        let start = self.value_offset(row) as i64;
846        (start, start + self.value_length() as i64)
847    }
848}
849
850impl ListArrayType for ListViewArray {
851    fn values(&self) -> &ArrayRef {
852        self.values()
853    }
854
855    fn value_offsets(&self, row: usize) -> (i64, i64) {
856        let offset = self.value_offsets()[row] as i64;
857        let size = self.value_sizes()[row] as i64;
858        (offset, offset + size)
859    }
860}
861
862impl ListArrayType for LargeListViewArray {
863    fn values(&self) -> &ArrayRef {
864        self.values()
865    }
866
867    fn value_offsets(&self, row: usize) -> (i64, i64) {
868        let offset = self.value_offsets()[row];
869        let size = self.value_sizes()[row];
870        (offset, offset + size)
871    }
872}
873
874/// Unnest multiple list arrays according to the length array.
875fn unnest_list_arrays(
876    list_arrays: &[ArrayRef],
877    length_array: &PrimitiveArray<Int64Type>,
878    capacity: usize,
879) -> Result<Vec<ArrayRef>> {
880    let typed_arrays = list_arrays
881        .iter()
882        .map(|list_array| match list_array.data_type() {
883            DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
884            DataType::LargeList(_) => {
885                Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
886            }
887            DataType::FixedSizeList(_, _) => {
888                Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
889            }
890            DataType::ListView(_) => {
891                Ok(list_array.as_list_view::<i32>() as &dyn ListArrayType)
892            }
893            DataType::LargeListView(_) => {
894                Ok(list_array.as_list_view::<i64>() as &dyn ListArrayType)
895            }
896            other => exec_err!("Invalid unnest datatype {other }"),
897        })
898        .collect::<Result<Vec<_>>>()?;
899
900    typed_arrays
901        .iter()
902        .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
903        .collect::<Result<_>>()
904}
905
906/// Unnest a list array according the target length array.
907///
908/// Consider a list array like this:
909///
910/// ```ignore
911/// [1], [2, 3, 4], null, [5], [],
912/// ```
913///
914/// and the length array is:
915///
916/// ```ignore
917/// [2, 3, 2, 1, 2]
918/// ```
919///
920/// If the length of a certain list is less than the target length, pad with NULLs.
921/// So the unnested array will look like this:
922///
923/// ```ignore
924/// [1, null, 2, 3, 4, null, null, 5, null, null]
925/// ```
926fn unnest_list_array(
927    list_array: &dyn ListArrayType,
928    length_array: &PrimitiveArray<Int64Type>,
929    capacity: usize,
930) -> Result<ArrayRef> {
931    let values = list_array.values();
932    let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
933    for row in 0..list_array.len() {
934        let mut value_length = 0;
935        if !list_array.is_null(row) {
936            let (start, end) = list_array.value_offsets(row);
937            value_length = end - start;
938            for i in start..end {
939                take_indices_builder.append_value(i)
940            }
941        }
942        let target_length = length_array.value(row);
943        debug_assert!(
944            value_length <= target_length,
945            "value length is beyond the longest length"
946        );
947        // Pad with NULL values
948        for _ in value_length..target_length {
949            take_indices_builder.append_null();
950        }
951    }
952    Ok(kernels::take::take(
953        &values,
954        &take_indices_builder.finish(),
955        None,
956    )?)
957}
958
959/// Creates take indices that will be used to expand all columns except for the list type
960/// [`columns`](UnnestExec::list_column_indices) that is being unnested.
961/// Every column value needs to be repeated multiple times according to the length array.
962///
963/// If the length array looks like this:
964///
965/// ```ignore
966/// [2, 3, 1]
967/// ```
968/// Then [`create_take_indices`] will return an array like this
969///
970/// ```ignore
971/// [0, 0, 1, 1, 1, 2]
972/// ```
973fn create_take_indices(
974    length_array: &PrimitiveArray<Int64Type>,
975    capacity: usize,
976) -> PrimitiveArray<Int64Type> {
977    // `find_longest_length()` guarantees this.
978    debug_assert!(
979        length_array.null_count() == 0,
980        "length array should not contain nulls"
981    );
982    let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
983    for (index, repeat) in length_array.iter().enumerate() {
984        // The length array should not contain nulls, so unwrap is safe
985        let repeat = repeat.unwrap();
986        (0..repeat).for_each(|_| builder.append_value(index as i64));
987    }
988    builder.finish()
989}
990
991/// Create a batch of arrays based on an input `batch` and a `indices` array.
992/// The `indices` array is used by the take kernel to repeat values in the arrays
993/// that are marked with `true` in the `repeat_mask`. Arrays marked with `false`
994/// in the `repeat_mask` will be replaced with arrays filled with nulls of the
995/// appropriate length.
996///
997/// For example if we have the following batch:
998///
999/// ```ignore
1000/// c1: [1], null, [2, 3, 4], null, [5, 6]
1001/// c2: 'a', 'b',  'c', null, 'd'
1002/// ```
1003///
1004/// then the `unnested_list_arrays` contains the unnest column that will replace `c1` in
1005/// the final batch if `preserve_nulls` is true:
1006///
1007/// ```ignore
1008/// c1: 1, null, 2, 3, 4, null, 5, 6
1009/// ```
1010///
1011/// And the `indices` array contains the indices that are used by `take` kernel to
1012/// repeat the values in `c2`:
1013///
1014/// ```ignore
1015/// 0, 1, 2, 2, 2, 3, 4, 4
1016/// ```
1017///
1018/// so that the final batch will look like:
1019///
1020/// ```ignore
1021/// c1: 1, null, 2, 3, 4, null, 5, 6
1022/// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
1023/// ```
1024///
1025/// The `repeat_mask` determines whether an array's values are repeated or replaced with nulls.
1026/// For example, if the `repeat_mask` is:
1027///
1028/// ```ignore
1029/// [true, false]
1030/// ```
1031///
1032/// The final batch will look like:
1033///
1034/// ```ignore
1035/// c1: 1, null, 2, 3, 4, null, 5, 6  // Repeated using `indices`
1036/// c2: null, null, null, null, null, null, null, null  // Replaced with nulls
1037fn repeat_arrs_from_indices(
1038    batch: &[ArrayRef],
1039    indices: &PrimitiveArray<Int64Type>,
1040    repeat_mask: &[bool],
1041) -> Result<Vec<Arc<dyn Array>>> {
1042    batch
1043        .iter()
1044        .zip(repeat_mask.iter())
1045        .map(|(arr, &repeat)| {
1046            if repeat {
1047                Ok(kernels::take::take(arr, indices, None)?)
1048            } else {
1049                Ok(new_null_array(arr.data_type(), arr.len()))
1050            }
1051        })
1052        .collect()
1053}
1054
1055#[cfg(test)]
1056mod tests {
1057    use super::*;
1058    use arrow::array::{
1059        GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
1060    };
1061    use arrow::buffer::{NullBuffer, OffsetBuffer};
1062    use arrow::datatypes::{Field, Int32Type};
1063    use datafusion_common::test_util::batches_to_string;
1064    use insta::assert_snapshot;
1065
1066    // Create a GenericListArray with the following list values:
1067    //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1068    fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
1069    where
1070        OffsetSize: OffsetSizeTrait,
1071    {
1072        let mut values = vec![];
1073        let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
1074        let mut valid = NullBufferBuilder::new(6);
1075
1076        // [A, B, C]
1077        values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
1078        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1079        valid.append_non_null();
1080
1081        // []
1082        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1083        valid.append_non_null();
1084
1085        // NULL with non-zero value length
1086        // Issue https://github.com/apache/datafusion/issues/9932
1087        values.push(Some("?"));
1088        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1089        valid.append_null();
1090
1091        // [D]
1092        values.push(Some("D"));
1093        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1094        valid.append_non_null();
1095
1096        // Another NULL with zero value length
1097        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1098        valid.append_null();
1099
1100        // [NULL, F]
1101        values.extend_from_slice(&[None, Some("F")]);
1102        offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1103        valid.append_non_null();
1104
1105        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1106        GenericListArray::<OffsetSize>::new(
1107            field,
1108            OffsetBuffer::new(offsets.into()),
1109            Arc::new(StringArray::from(values)),
1110            valid.finish(),
1111        )
1112    }
1113
1114    // Create a FixedSizeListArray with the following list values:
1115    //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1116    fn make_fixed_list() -> FixedSizeListArray {
1117        let values = Arc::new(StringArray::from_iter([
1118            Some("A"),
1119            Some("B"),
1120            None,
1121            None,
1122            Some("C"),
1123            Some("D"),
1124            None,
1125            None,
1126            None,
1127            Some("F"),
1128            None,
1129            None,
1130        ]));
1131        let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1132        let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1133        FixedSizeListArray::new(field, 2, values, Some(valid))
1134    }
1135
1136    fn verify_unnest_list_array(
1137        list_array: &dyn ListArrayType,
1138        lengths: Vec<i64>,
1139        expected: Vec<Option<&str>>,
1140    ) -> Result<()> {
1141        let length_array = Int64Array::from(lengths);
1142        let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1143        let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1144        assert_eq!(strs, expected);
1145        Ok(())
1146    }
1147
1148    #[test]
1149    fn test_build_batch_list_arr_recursive() -> Result<()> {
1150        // col1                             | col2
1151        // [[1,2,3],null,[4,5]]             | ['a','b']
1152        // [[7,8,9,10], null, [11,12,13]]   | ['c','d']
1153        // null                             | ['e']
1154        let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1155            Some(vec![Some(1), Some(2), Some(3)]),
1156            None,
1157            Some(vec![Some(4), Some(5)]),
1158            Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1159            None,
1160            Some(vec![Some(11), Some(12), Some(13)]),
1161        ]);
1162
1163        let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1164        let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1165        let mut nulls = NullBufferBuilder::new(3);
1166        nulls.append_non_null();
1167        nulls.append_non_null();
1168        nulls.append_null();
1169        // list<list<int32>>
1170        let col1_field = Field::new_list_field(
1171            DataType::List(Arc::new(Field::new_list_field(
1172                list_arr1_ref.data_type().to_owned(),
1173                true,
1174            ))),
1175            true,
1176        );
1177        let col1 = ListArray::new(
1178            Arc::new(Field::new_list_field(
1179                list_arr1_ref.data_type().to_owned(),
1180                true,
1181            )),
1182            offsets,
1183            list_arr1_ref,
1184            nulls.finish(),
1185        );
1186
1187        let list_arr2 = StringArray::from(vec![
1188            Some("a"),
1189            Some("b"),
1190            Some("c"),
1191            Some("d"),
1192            Some("e"),
1193        ]);
1194
1195        let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1196        let mut nulls = NullBufferBuilder::new(3);
1197        nulls.append_n_non_nulls(3);
1198        let col2_field = Field::new(
1199            "col2",
1200            DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1201            true,
1202        );
1203        let col2 = GenericListArray::<i32>::new(
1204            Arc::new(Field::new_list_field(DataType::Utf8, true)),
1205            OffsetBuffer::new(offsets.into()),
1206            Arc::new(list_arr2),
1207            nulls.finish(),
1208        );
1209        // convert col1 and col2 to a record batch
1210        let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1211        let out_schema = Arc::new(Schema::new(vec![
1212            Field::new(
1213                "col1_unnest_placeholder_depth_1",
1214                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1215                true,
1216            ),
1217            Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1218            Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1219        ]));
1220        let batch = RecordBatch::try_new(
1221            Arc::clone(&schema),
1222            vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1223        )
1224        .unwrap();
1225        let list_type_columns = vec![
1226            ListUnnest {
1227                index_in_input_schema: 0,
1228                depth: 1,
1229            },
1230            ListUnnest {
1231                index_in_input_schema: 0,
1232                depth: 2,
1233            },
1234            ListUnnest {
1235                index_in_input_schema: 1,
1236                depth: 1,
1237            },
1238        ];
1239        let ret = build_batch(
1240            &batch,
1241            &out_schema,
1242            list_type_columns.as_ref(),
1243            &HashSet::default(),
1244            &UnnestOptions {
1245                preserve_nulls: true,
1246                recursions: vec![],
1247            },
1248        )?
1249        .unwrap();
1250
1251        assert_snapshot!(batches_to_string(&[ret]),
1252        @r"
1253        +---------------------------------+---------------------------------+---------------------------------+
1254        | col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |
1255        +---------------------------------+---------------------------------+---------------------------------+
1256        | [1, 2, 3]                       | 1                               | a                               |
1257        |                                 | 2                               | b                               |
1258        | [4, 5]                          | 3                               |                                 |
1259        | [1, 2, 3]                       |                                 | a                               |
1260        |                                 |                                 | b                               |
1261        | [4, 5]                          |                                 |                                 |
1262        | [1, 2, 3]                       | 4                               | a                               |
1263        |                                 | 5                               | b                               |
1264        | [4, 5]                          |                                 |                                 |
1265        | [7, 8, 9, 10]                   | 7                               | c                               |
1266        |                                 | 8                               | d                               |
1267        | [11, 12, 13]                    | 9                               |                                 |
1268        |                                 | 10                              |                                 |
1269        | [7, 8, 9, 10]                   |                                 | c                               |
1270        |                                 |                                 | d                               |
1271        | [11, 12, 13]                    |                                 |                                 |
1272        | [7, 8, 9, 10]                   | 11                              | c                               |
1273        |                                 | 12                              | d                               |
1274        | [11, 12, 13]                    | 13                              |                                 |
1275        |                                 |                                 | e                               |
1276        +---------------------------------+---------------------------------+---------------------------------+
1277        ");
1278        Ok(())
1279    }
1280
1281    #[test]
1282    fn test_unnest_list_array() -> Result<()> {
1283        // [A, B, C], [], NULL, [D], NULL, [NULL, F]
1284        let list_array = make_generic_array::<i32>();
1285        verify_unnest_list_array(
1286            &list_array,
1287            vec![3, 2, 1, 2, 0, 3],
1288            vec![
1289                Some("A"),
1290                Some("B"),
1291                Some("C"),
1292                None,
1293                None,
1294                None,
1295                Some("D"),
1296                None,
1297                None,
1298                Some("F"),
1299                None,
1300            ],
1301        )?;
1302
1303        // [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1304        let list_array = make_fixed_list();
1305        verify_unnest_list_array(
1306            &list_array,
1307            vec![3, 1, 2, 0, 2, 3],
1308            vec![
1309                Some("A"),
1310                Some("B"),
1311                None,
1312                None,
1313                Some("C"),
1314                Some("D"),
1315                None,
1316                Some("F"),
1317                None,
1318                None,
1319                None,
1320            ],
1321        )?;
1322
1323        Ok(())
1324    }
1325
1326    fn verify_longest_length(
1327        list_arrays: &[ArrayRef],
1328        preserve_nulls: bool,
1329        expected: Vec<i64>,
1330    ) -> Result<()> {
1331        let options = UnnestOptions {
1332            preserve_nulls,
1333            recursions: vec![],
1334        };
1335        let longest_length = find_longest_length(list_arrays, &options)?;
1336        let expected_array = Int64Array::from(expected);
1337        assert_eq!(
1338            longest_length
1339                .as_any()
1340                .downcast_ref::<Int64Array>()
1341                .unwrap(),
1342            &expected_array
1343        );
1344        Ok(())
1345    }
1346
1347    #[test]
1348    fn test_longest_list_length() -> Result<()> {
1349        // Test with single ListArray
1350        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1351        let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1352        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1353        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1354
1355        // Test with single LargeListArray
1356        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1357        let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1358        verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1359        verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1360
1361        // Test with single FixedSizeListArray
1362        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1363        let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1364        verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1365        verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1366
1367        // Test with multiple list arrays
1368        //  [A, B, C], [], NULL, [D], NULL, [NULL, F]
1369        //  [A, B], NULL, [C, D], NULL, [NULL, F], [NULL, NULL]
1370        let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1371        let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1372        let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1373        verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1374        verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1375
1376        Ok(())
1377    }
1378
1379    #[test]
1380    fn test_create_take_indices() -> Result<()> {
1381        let length_array = Int64Array::from(vec![2, 3, 1]);
1382        let take_indices = create_take_indices(&length_array, 6);
1383        let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1384        assert_eq!(take_indices, expected);
1385        Ok(())
1386    }
1387}