1use std::cmp::{self, Ordering};
21use std::task::{ready, Poll};
22use std::{any::Any, sync::Arc};
23
24use super::metrics::{
25 self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
26 RecordOutput,
27};
28use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
29use crate::{
30 DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
31 SendableRecordBatchStream,
32};
33
34use arrow::array::{
35 new_null_array, Array, ArrayRef, AsArray, FixedSizeListArray, Int64Array,
36 LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray,
37};
38use arrow::compute::kernels::length::length;
39use arrow::compute::kernels::zip::zip;
40use arrow::compute::{cast, is_not_null, kernels, sum};
41use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
42use arrow::record_batch::RecordBatch;
43use arrow_ord::cmp::lt;
44use async_trait::async_trait;
45use datafusion_common::{
46 exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions,
47};
48use datafusion_execution::TaskContext;
49use datafusion_physical_expr::EquivalenceProperties;
50use futures::{Stream, StreamExt};
51use log::trace;
52
53#[derive(Debug, Clone)]
60pub struct UnnestExec {
61 input: Arc<dyn ExecutionPlan>,
63 schema: SchemaRef,
65 list_column_indices: Vec<ListUnnest>,
67 struct_column_indices: Vec<usize>,
69 options: UnnestOptions,
71 metrics: ExecutionPlanMetricsSet,
73 cache: PlanProperties,
75}
76
77impl UnnestExec {
78 pub fn new(
80 input: Arc<dyn ExecutionPlan>,
81 list_column_indices: Vec<ListUnnest>,
82 struct_column_indices: Vec<usize>,
83 schema: SchemaRef,
84 options: UnnestOptions,
85 ) -> Self {
86 let cache = Self::compute_properties(&input, Arc::clone(&schema));
87
88 UnnestExec {
89 input,
90 schema,
91 list_column_indices,
92 struct_column_indices,
93 options,
94 metrics: Default::default(),
95 cache,
96 }
97 }
98
99 fn compute_properties(
101 input: &Arc<dyn ExecutionPlan>,
102 schema: SchemaRef,
103 ) -> PlanProperties {
104 PlanProperties::new(
105 EquivalenceProperties::new(schema),
106 input.output_partitioning().to_owned(),
107 input.pipeline_behavior(),
108 input.boundedness(),
109 )
110 }
111
112 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
114 &self.input
115 }
116
117 pub fn list_column_indices(&self) -> &[ListUnnest] {
119 &self.list_column_indices
120 }
121
122 pub fn struct_column_indices(&self) -> &[usize] {
124 &self.struct_column_indices
125 }
126
127 pub fn options(&self) -> &UnnestOptions {
128 &self.options
129 }
130}
131
132impl DisplayAs for UnnestExec {
133 fn fmt_as(
134 &self,
135 t: DisplayFormatType,
136 f: &mut std::fmt::Formatter,
137 ) -> std::fmt::Result {
138 match t {
139 DisplayFormatType::Default | DisplayFormatType::Verbose => {
140 write!(f, "UnnestExec")
141 }
142 DisplayFormatType::TreeRender => {
143 write!(f, "")
144 }
145 }
146 }
147}
148
149impl ExecutionPlan for UnnestExec {
150 fn name(&self) -> &'static str {
151 "UnnestExec"
152 }
153
154 fn as_any(&self) -> &dyn Any {
155 self
156 }
157
158 fn properties(&self) -> &PlanProperties {
159 &self.cache
160 }
161
162 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
163 vec![&self.input]
164 }
165
166 fn with_new_children(
167 self: Arc<Self>,
168 children: Vec<Arc<dyn ExecutionPlan>>,
169 ) -> Result<Arc<dyn ExecutionPlan>> {
170 Ok(Arc::new(UnnestExec::new(
171 Arc::clone(&children[0]),
172 self.list_column_indices.clone(),
173 self.struct_column_indices.clone(),
174 Arc::clone(&self.schema),
175 self.options.clone(),
176 )))
177 }
178
179 fn required_input_distribution(&self) -> Vec<Distribution> {
180 vec![Distribution::UnspecifiedDistribution]
181 }
182
183 fn execute(
184 &self,
185 partition: usize,
186 context: Arc<TaskContext>,
187 ) -> Result<SendableRecordBatchStream> {
188 let input = self.input.execute(partition, context)?;
189 let metrics = UnnestMetrics::new(partition, &self.metrics);
190
191 Ok(Box::pin(UnnestStream {
192 input,
193 schema: Arc::clone(&self.schema),
194 list_type_columns: self.list_column_indices.clone(),
195 struct_column_indices: self.struct_column_indices.iter().copied().collect(),
196 options: self.options.clone(),
197 metrics,
198 }))
199 }
200
201 fn metrics(&self) -> Option<MetricsSet> {
202 Some(self.metrics.clone_inner())
203 }
204}
205
206#[derive(Clone, Debug)]
207struct UnnestMetrics {
208 baseline_metrics: BaselineMetrics,
210 input_batches: metrics::Count,
212 input_rows: metrics::Count,
214 output_batches: metrics::Count,
216}
217
218impl UnnestMetrics {
219 fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
220 let input_batches =
221 MetricBuilder::new(metrics).counter("input_batches", partition);
222
223 let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
224
225 let output_batches =
226 MetricBuilder::new(metrics).counter("output_batches", partition);
227
228 Self {
229 baseline_metrics: BaselineMetrics::new(metrics, partition),
230 input_batches,
231 input_rows,
232 output_batches,
233 }
234 }
235}
236
237struct UnnestStream {
239 input: SendableRecordBatchStream,
241 schema: Arc<Schema>,
243 list_type_columns: Vec<ListUnnest>,
247 struct_column_indices: HashSet<usize>,
248 options: UnnestOptions,
250 metrics: UnnestMetrics,
252}
253
254impl RecordBatchStream for UnnestStream {
255 fn schema(&self) -> SchemaRef {
256 Arc::clone(&self.schema)
257 }
258}
259
260#[async_trait]
261impl Stream for UnnestStream {
262 type Item = Result<RecordBatch>;
263
264 fn poll_next(
265 mut self: std::pin::Pin<&mut Self>,
266 cx: &mut std::task::Context<'_>,
267 ) -> Poll<Option<Self::Item>> {
268 self.poll_next_impl(cx)
269 }
270}
271
272impl UnnestStream {
273 fn poll_next_impl(
276 &mut self,
277 cx: &mut std::task::Context<'_>,
278 ) -> Poll<Option<Result<RecordBatch>>> {
279 loop {
280 return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
281 Some(Ok(batch)) => {
282 let elapsed_compute =
283 self.metrics.baseline_metrics.elapsed_compute().clone();
284 let timer = elapsed_compute.timer();
285 self.metrics.input_batches.add(1);
286 self.metrics.input_rows.add(batch.num_rows());
287 let result = build_batch(
288 &batch,
289 &self.schema,
290 &self.list_type_columns,
291 &self.struct_column_indices,
292 &self.options,
293 )?;
294 timer.done();
295 let Some(result_batch) = result else {
296 continue;
297 };
298 self.metrics.output_batches.add(1);
299 (&result_batch).record_output(&self.metrics.baseline_metrics);
300
301 debug_assert!(result_batch.num_rows() > 0);
304 Some(Ok(result_batch))
305 }
306 other => {
307 trace!(
308 "Processed {} probe-side input batches containing {} rows and \
309 produced {} output batches containing {} rows in {}",
310 self.metrics.input_batches,
311 self.metrics.input_rows,
312 self.metrics.output_batches,
313 self.metrics.baseline_metrics.output_rows(),
314 self.metrics.baseline_metrics.elapsed_compute(),
315 );
316 other
317 }
318 });
319 }
320 }
321}
322
323fn flatten_struct_cols(
334 input_batch: &[Arc<dyn Array>],
335 schema: &SchemaRef,
336 struct_column_indices: &HashSet<usize>,
337) -> Result<RecordBatch> {
338 let columns_expanded = input_batch
340 .iter()
341 .enumerate()
342 .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
343 Some(_) => match column_data.data_type() {
344 DataType::Struct(_) => {
345 let struct_arr =
346 column_data.as_any().downcast_ref::<StructArray>().unwrap();
347 Ok(struct_arr.columns().to_vec())
348 }
349 data_type => internal_err!(
350 "expecting column {} from input plan to be a struct, got {:?}",
351 idx,
352 data_type
353 ),
354 },
355 None => Ok(vec![Arc::clone(column_data)]),
356 })
357 .collect::<Result<Vec<_>>>()?
358 .into_iter()
359 .flatten()
360 .collect();
361 Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
362}
363
364#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
365pub struct ListUnnest {
366 pub index_in_input_schema: usize,
367 pub depth: usize,
368}
369
370fn list_unnest_at_level(
409 batch: &[ArrayRef],
410 list_type_unnests: &[ListUnnest],
411 temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
412 level_to_unnest: usize,
413 options: &UnnestOptions,
414) -> Result<Option<Vec<ArrayRef>>> {
415 let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
417 list_type_unnests
418 .iter()
419 .filter_map(|unnesting| {
420 if level_to_unnest == unnesting.depth {
421 return Some((
422 Arc::clone(&batch[unnesting.index_in_input_schema]),
423 *unnesting,
424 ));
425 }
426 if level_to_unnest < unnesting.depth {
429 return Some((
430 Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
431 *unnesting,
432 ));
433 }
434 None
435 })
436 .unzip();
437
438 let longest_length = find_longest_length(&arrs_to_unnest, options)?;
441 let unnested_length = longest_length.as_primitive::<Int64Type>();
442 let total_length = if unnested_length.is_empty() {
443 0
444 } else {
445 sum(unnested_length).ok_or_else(|| {
446 exec_datafusion_err!("Failed to calculate the total unnested length")
447 })? as usize
448 };
449 if total_length == 0 {
450 return Ok(None);
451 }
452
453 let unnested_temp_arrays =
455 unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
456
457 let take_indices = create_take_indices(unnested_length, total_length);
459 unnested_temp_arrays
460 .into_iter()
461 .zip(list_unnest_specs.iter())
462 .for_each(|(flatten_arr, unnesting)| {
463 temp_unnested_arrs.insert(*unnesting, flatten_arr);
464 });
465
466 let repeat_mask: Vec<bool> = batch
467 .iter()
468 .enumerate()
469 .map(|(i, _)| {
470 let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
472 unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
473 });
474
475 let is_involved_in_unnesting = list_type_unnests
477 .iter()
478 .any(|unnesting| unnesting.index_in_input_schema == i);
479
480 needed_in_future_levels || !is_involved_in_unnesting
482 })
483 .collect();
484
485 let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
488
489 Ok(Some(ret))
490}
491struct UnnestingResult {
492 arr: ArrayRef,
493 depth: usize,
494}
495
496fn build_batch(
553 batch: &RecordBatch,
554 schema: &SchemaRef,
555 list_type_columns: &[ListUnnest],
556 struct_column_indices: &HashSet<usize>,
557 options: &UnnestOptions,
558) -> Result<Option<RecordBatch>> {
559 let transformed = match list_type_columns.len() {
560 0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
561 _ => {
562 let mut temp_unnested_result = HashMap::new();
563 let max_recursion = list_type_columns
564 .iter()
565 .fold(0, |highest_depth, ListUnnest { depth, .. }| {
566 cmp::max(highest_depth, *depth)
567 });
568
569 let mut flatten_arrs = vec![];
571
572 for depth in (1..=max_recursion).rev() {
575 let input = match depth == max_recursion {
576 true => batch.columns(),
577 false => &flatten_arrs,
578 };
579 let Some(temp_result) = list_unnest_at_level(
580 input,
581 list_type_columns,
582 &mut temp_unnested_result,
583 depth,
584 options,
585 )?
586 else {
587 return Ok(None);
588 };
589 flatten_arrs = temp_result;
590 }
591 let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
592 temp_unnested_result.into_iter().fold(
593 HashMap::new(),
594 |mut acc,
595 (
596 ListUnnest {
597 index_in_input_schema,
598 depth,
599 },
600 flattened_array,
601 )| {
602 acc.entry(index_in_input_schema).or_default().push(
603 UnnestingResult {
604 arr: flattened_array,
605 depth,
606 },
607 );
608 acc
609 },
610 );
611 let output_order: HashMap<ListUnnest, usize> = list_type_columns
612 .iter()
613 .enumerate()
614 .map(|(order, unnest_def)| (*unnest_def, order))
615 .collect();
616
617 let mut multi_unnested_per_original_index = unnested_array_map
619 .into_iter()
620 .map(
621 |(original_index, mut unnested_columns)| {
625 unnested_columns.sort_by(
626 |UnnestingResult { depth: depth1, .. },
627 UnnestingResult { depth: depth2, .. }|
628 -> Ordering {
629 output_order
630 .get(&ListUnnest {
631 depth: *depth1,
632 index_in_input_schema: original_index,
633 })
634 .unwrap()
635 .cmp(
636 output_order
637 .get(&ListUnnest {
638 depth: *depth2,
639 index_in_input_schema: original_index,
640 })
641 .unwrap(),
642 )
643 },
644 );
645 (
646 original_index,
647 unnested_columns
648 .into_iter()
649 .map(|result| result.arr)
650 .collect::<Vec<_>>(),
651 )
652 },
653 )
654 .collect::<HashMap<_, _>>();
655
656 let ret = flatten_arrs
657 .into_iter()
658 .enumerate()
659 .flat_map(|(col_idx, arr)| {
660 match multi_unnested_per_original_index.remove(&col_idx) {
664 Some(unnested_arrays) => unnested_arrays,
665 None => vec![arr],
666 }
667 })
668 .collect::<Vec<_>>();
669
670 flatten_struct_cols(&ret, schema, struct_column_indices)
671 }
672 }?;
673 Ok(Some(transformed))
674}
675
676fn find_longest_length(
699 list_arrays: &[ArrayRef],
700 options: &UnnestOptions,
701) -> Result<ArrayRef> {
702 let null_length = if options.preserve_nulls {
704 Scalar::new(Int64Array::from_value(1, 1))
705 } else {
706 Scalar::new(Int64Array::from_value(0, 1))
707 };
708 let list_lengths: Vec<ArrayRef> = list_arrays
709 .iter()
710 .map(|list_array| {
711 let mut length_array = length(list_array)?;
712 length_array = cast(&length_array, &DataType::Int64)?;
714 length_array =
715 zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
716 Ok(length_array)
717 })
718 .collect::<Result<_>>()?;
719
720 let longest_length = list_lengths.iter().skip(1).try_fold(
721 Arc::clone(&list_lengths[0]),
722 |longest, current| {
723 let is_lt = lt(&longest, ¤t)?;
724 zip(&is_lt, ¤t, &longest)
725 },
726 )?;
727 Ok(longest_length)
728}
729
730trait ListArrayType: Array {
732 fn values(&self) -> &ArrayRef;
734
735 fn value_offsets(&self, row: usize) -> (i64, i64);
737}
738
739impl ListArrayType for ListArray {
740 fn values(&self) -> &ArrayRef {
741 self.values()
742 }
743
744 fn value_offsets(&self, row: usize) -> (i64, i64) {
745 let offsets = self.value_offsets();
746 (offsets[row].into(), offsets[row + 1].into())
747 }
748}
749
750impl ListArrayType for LargeListArray {
751 fn values(&self) -> &ArrayRef {
752 self.values()
753 }
754
755 fn value_offsets(&self, row: usize) -> (i64, i64) {
756 let offsets = self.value_offsets();
757 (offsets[row], offsets[row + 1])
758 }
759}
760
761impl ListArrayType for FixedSizeListArray {
762 fn values(&self) -> &ArrayRef {
763 self.values()
764 }
765
766 fn value_offsets(&self, row: usize) -> (i64, i64) {
767 let start = self.value_offset(row) as i64;
768 (start, start + self.value_length() as i64)
769 }
770}
771
772fn unnest_list_arrays(
774 list_arrays: &[ArrayRef],
775 length_array: &PrimitiveArray<Int64Type>,
776 capacity: usize,
777) -> Result<Vec<ArrayRef>> {
778 let typed_arrays = list_arrays
779 .iter()
780 .map(|list_array| match list_array.data_type() {
781 DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
782 DataType::LargeList(_) => {
783 Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
784 }
785 DataType::FixedSizeList(_, _) => {
786 Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
787 }
788 other => exec_err!("Invalid unnest datatype {other }"),
789 })
790 .collect::<Result<Vec<_>>>()?;
791
792 typed_arrays
793 .iter()
794 .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
795 .collect::<Result<_>>()
796}
797
798fn unnest_list_array(
820 list_array: &dyn ListArrayType,
821 length_array: &PrimitiveArray<Int64Type>,
822 capacity: usize,
823) -> Result<ArrayRef> {
824 let values = list_array.values();
825 let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
826 for row in 0..list_array.len() {
827 let mut value_length = 0;
828 if !list_array.is_null(row) {
829 let (start, end) = list_array.value_offsets(row);
830 value_length = end - start;
831 for i in start..end {
832 take_indices_builder.append_value(i)
833 }
834 }
835 let target_length = length_array.value(row);
836 debug_assert!(
837 value_length <= target_length,
838 "value length is beyond the longest length"
839 );
840 for _ in value_length..target_length {
842 take_indices_builder.append_null();
843 }
844 }
845 Ok(kernels::take::take(
846 &values,
847 &take_indices_builder.finish(),
848 None,
849 )?)
850}
851
852fn create_take_indices(
868 length_array: &PrimitiveArray<Int64Type>,
869 capacity: usize,
870) -> PrimitiveArray<Int64Type> {
871 debug_assert!(
873 length_array.null_count() == 0,
874 "length array should not contain nulls"
875 );
876 let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
877 for (index, repeat) in length_array.iter().enumerate() {
878 let repeat = repeat.unwrap();
880 (0..repeat).for_each(|_| builder.append_value(index as i64));
881 }
882 builder.finish()
883}
884
885fn repeat_arrs_from_indices(
933 batch: &[ArrayRef],
934 indices: &PrimitiveArray<Int64Type>,
935 repeat_mask: &[bool],
936) -> Result<Vec<Arc<dyn Array>>> {
937 batch
938 .iter()
939 .zip(repeat_mask.iter())
940 .map(|(arr, &repeat)| {
941 if repeat {
942 Ok(kernels::take::take(arr, indices, None)?)
943 } else {
944 Ok(new_null_array(arr.data_type(), arr.len()))
945 }
946 })
947 .collect()
948}
949
950#[cfg(test)]
951mod tests {
952 use super::*;
953 use arrow::array::{
954 GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
955 };
956 use arrow::buffer::{NullBuffer, OffsetBuffer};
957 use arrow::datatypes::{Field, Int32Type};
958 use datafusion_common::test_util::batches_to_string;
959 use insta::assert_snapshot;
960
961 fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
964 where
965 OffsetSize: OffsetSizeTrait,
966 {
967 let mut values = vec![];
968 let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
969 let mut valid = NullBufferBuilder::new(6);
970
971 values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
973 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
974 valid.append_non_null();
975
976 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
978 valid.append_non_null();
979
980 values.push(Some("?"));
983 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
984 valid.append_null();
985
986 values.push(Some("D"));
988 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
989 valid.append_non_null();
990
991 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
993 valid.append_null();
994
995 values.extend_from_slice(&[None, Some("F")]);
997 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
998 valid.append_non_null();
999
1000 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1001 GenericListArray::<OffsetSize>::new(
1002 field,
1003 OffsetBuffer::new(offsets.into()),
1004 Arc::new(StringArray::from(values)),
1005 valid.finish(),
1006 )
1007 }
1008
1009 fn make_fixed_list() -> FixedSizeListArray {
1012 let values = Arc::new(StringArray::from_iter([
1013 Some("A"),
1014 Some("B"),
1015 None,
1016 None,
1017 Some("C"),
1018 Some("D"),
1019 None,
1020 None,
1021 None,
1022 Some("F"),
1023 None,
1024 None,
1025 ]));
1026 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1027 let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1028 FixedSizeListArray::new(field, 2, values, Some(valid))
1029 }
1030
1031 fn verify_unnest_list_array(
1032 list_array: &dyn ListArrayType,
1033 lengths: Vec<i64>,
1034 expected: Vec<Option<&str>>,
1035 ) -> Result<()> {
1036 let length_array = Int64Array::from(lengths);
1037 let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1038 let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1039 assert_eq!(strs, expected);
1040 Ok(())
1041 }
1042
1043 #[test]
1044 fn test_build_batch_list_arr_recursive() -> Result<()> {
1045 let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1050 Some(vec![Some(1), Some(2), Some(3)]),
1051 None,
1052 Some(vec![Some(4), Some(5)]),
1053 Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1054 None,
1055 Some(vec![Some(11), Some(12), Some(13)]),
1056 ]);
1057
1058 let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1059 let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1060 let mut nulls = NullBufferBuilder::new(3);
1061 nulls.append_non_null();
1062 nulls.append_non_null();
1063 nulls.append_null();
1064 let col1_field = Field::new_list_field(
1066 DataType::List(Arc::new(Field::new_list_field(
1067 list_arr1_ref.data_type().to_owned(),
1068 true,
1069 ))),
1070 true,
1071 );
1072 let col1 = ListArray::new(
1073 Arc::new(Field::new_list_field(
1074 list_arr1_ref.data_type().to_owned(),
1075 true,
1076 )),
1077 offsets,
1078 list_arr1_ref,
1079 nulls.finish(),
1080 );
1081
1082 let list_arr2 = StringArray::from(vec![
1083 Some("a"),
1084 Some("b"),
1085 Some("c"),
1086 Some("d"),
1087 Some("e"),
1088 ]);
1089
1090 let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1091 let mut nulls = NullBufferBuilder::new(3);
1092 nulls.append_n_non_nulls(3);
1093 let col2_field = Field::new(
1094 "col2",
1095 DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1096 true,
1097 );
1098 let col2 = GenericListArray::<i32>::new(
1099 Arc::new(Field::new_list_field(DataType::Utf8, true)),
1100 OffsetBuffer::new(offsets.into()),
1101 Arc::new(list_arr2),
1102 nulls.finish(),
1103 );
1104 let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1106 let out_schema = Arc::new(Schema::new(vec![
1107 Field::new(
1108 "col1_unnest_placeholder_depth_1",
1109 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1110 true,
1111 ),
1112 Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1113 Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1114 ]));
1115 let batch = RecordBatch::try_new(
1116 Arc::clone(&schema),
1117 vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1118 )
1119 .unwrap();
1120 let list_type_columns = vec![
1121 ListUnnest {
1122 index_in_input_schema: 0,
1123 depth: 1,
1124 },
1125 ListUnnest {
1126 index_in_input_schema: 0,
1127 depth: 2,
1128 },
1129 ListUnnest {
1130 index_in_input_schema: 1,
1131 depth: 1,
1132 },
1133 ];
1134 let ret = build_batch(
1135 &batch,
1136 &out_schema,
1137 list_type_columns.as_ref(),
1138 &HashSet::default(),
1139 &UnnestOptions {
1140 preserve_nulls: true,
1141 recursions: vec![],
1142 },
1143 )?
1144 .unwrap();
1145
1146 assert_snapshot!(batches_to_string(&[ret]),
1147 @r###"
1148+---------------------------------+---------------------------------+---------------------------------+
1149| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |
1150+---------------------------------+---------------------------------+---------------------------------+
1151| [1, 2, 3] | 1 | a |
1152| | 2 | b |
1153| [4, 5] | 3 | |
1154| [1, 2, 3] | | a |
1155| | | b |
1156| [4, 5] | | |
1157| [1, 2, 3] | 4 | a |
1158| | 5 | b |
1159| [4, 5] | | |
1160| [7, 8, 9, 10] | 7 | c |
1161| | 8 | d |
1162| [11, 12, 13] | 9 | |
1163| | 10 | |
1164| [7, 8, 9, 10] | | c |
1165| | | d |
1166| [11, 12, 13] | | |
1167| [7, 8, 9, 10] | 11 | c |
1168| | 12 | d |
1169| [11, 12, 13] | 13 | |
1170| | | e |
1171+---------------------------------+---------------------------------+---------------------------------+
1172 "###);
1173 Ok(())
1174 }
1175
1176 #[test]
1177 fn test_unnest_list_array() -> Result<()> {
1178 let list_array = make_generic_array::<i32>();
1180 verify_unnest_list_array(
1181 &list_array,
1182 vec![3, 2, 1, 2, 0, 3],
1183 vec![
1184 Some("A"),
1185 Some("B"),
1186 Some("C"),
1187 None,
1188 None,
1189 None,
1190 Some("D"),
1191 None,
1192 None,
1193 Some("F"),
1194 None,
1195 ],
1196 )?;
1197
1198 let list_array = make_fixed_list();
1200 verify_unnest_list_array(
1201 &list_array,
1202 vec![3, 1, 2, 0, 2, 3],
1203 vec![
1204 Some("A"),
1205 Some("B"),
1206 None,
1207 None,
1208 Some("C"),
1209 Some("D"),
1210 None,
1211 Some("F"),
1212 None,
1213 None,
1214 None,
1215 ],
1216 )?;
1217
1218 Ok(())
1219 }
1220
1221 fn verify_longest_length(
1222 list_arrays: &[ArrayRef],
1223 preserve_nulls: bool,
1224 expected: Vec<i64>,
1225 ) -> Result<()> {
1226 let options = UnnestOptions {
1227 preserve_nulls,
1228 recursions: vec![],
1229 };
1230 let longest_length = find_longest_length(list_arrays, &options)?;
1231 let expected_array = Int64Array::from(expected);
1232 assert_eq!(
1233 longest_length
1234 .as_any()
1235 .downcast_ref::<Int64Array>()
1236 .unwrap(),
1237 &expected_array
1238 );
1239 Ok(())
1240 }
1241
1242 #[test]
1243 fn test_longest_list_length() -> Result<()> {
1244 let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1247 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1248 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1249
1250 let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1253 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1254 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1255
1256 let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1259 verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1260 verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1261
1262 let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1266 let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1267 let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1268 verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1269 verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1270
1271 Ok(())
1272 }
1273
1274 #[test]
1275 fn test_create_take_indices() -> Result<()> {
1276 let length_array = Int64Array::from(vec![2, 3, 1]);
1277 let take_indices = create_take_indices(&length_array, 6);
1278 let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1279 assert_eq!(take_indices, expected);
1280 Ok(())
1281 }
1282}