1use std::cmp::{self, Ordering};
21use std::task::{Poll, ready};
22use std::{any::Any, sync::Arc};
23
24use super::metrics::{
25 self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
26 RecordOutput,
27};
28use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
29use crate::{
30 DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
31 SendableRecordBatchStream,
32};
33
34use arrow::array::{
35 Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray, Int64Array,
36 LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray, new_null_array,
37};
38use arrow::compute::kernels::length::length;
39use arrow::compute::kernels::zip::zip;
40use arrow::compute::{cast, is_not_null, kernels, sum};
41use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
42use arrow::record_batch::RecordBatch;
43use arrow_ord::cmp::lt;
44use async_trait::async_trait;
45use datafusion_common::{
46 Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err,
47 internal_err,
48};
49use datafusion_execution::TaskContext;
50use datafusion_physical_expr::PhysicalExpr;
51use datafusion_physical_expr::equivalence::ProjectionMapping;
52use datafusion_physical_expr::expressions::Column;
53use futures::{Stream, StreamExt};
54use log::trace;
55
56#[derive(Debug, Clone)]
63pub struct UnnestExec {
64 input: Arc<dyn ExecutionPlan>,
66 schema: SchemaRef,
68 list_column_indices: Vec<ListUnnest>,
70 struct_column_indices: Vec<usize>,
72 options: UnnestOptions,
74 metrics: ExecutionPlanMetricsSet,
76 cache: PlanProperties,
78}
79
80impl UnnestExec {
81 pub fn new(
83 input: Arc<dyn ExecutionPlan>,
84 list_column_indices: Vec<ListUnnest>,
85 struct_column_indices: Vec<usize>,
86 schema: SchemaRef,
87 options: UnnestOptions,
88 ) -> Result<Self> {
89 let cache = Self::compute_properties(
90 &input,
91 &list_column_indices,
92 &struct_column_indices,
93 &schema,
94 )?;
95
96 Ok(UnnestExec {
97 input,
98 schema,
99 list_column_indices,
100 struct_column_indices,
101 options,
102 metrics: Default::default(),
103 cache,
104 })
105 }
106
107 fn compute_properties(
109 input: &Arc<dyn ExecutionPlan>,
110 list_column_indices: &[ListUnnest],
111 struct_column_indices: &[usize],
112 schema: &SchemaRef,
113 ) -> Result<PlanProperties> {
114 let input_schema = input.schema();
116 let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len());
117 unnested_indices.append_n(input_schema.fields().len(), false);
118 for list_unnest in list_column_indices {
119 unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
120 }
121 for struct_unnest in struct_column_indices {
122 unnested_indices.set_bit(*struct_unnest, true)
123 }
124 let unnested_indices = unnested_indices.finish();
125 let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
126 .filter(|idx| !unnested_indices.value(*idx))
127 .collect();
128
129 let input_schema = input.schema();
131 let projection_mapping: ProjectionMapping = non_unnested_indices
132 .iter()
133 .map(|&input_idx| {
134 let input_field = input_schema.field(input_idx);
136 let output_idx = schema
137 .fields()
138 .iter()
139 .position(|output_field| output_field.name() == input_field.name())
140 .ok_or_else(|| {
141 exec_datafusion_err!(
142 "Non-unnested column '{}' must exist in output schema",
143 input_field.name()
144 )
145 })?;
146
147 let input_col = Arc::new(Column::new(input_field.name(), input_idx))
148 as Arc<dyn PhysicalExpr>;
149 let target_col = Arc::new(Column::new(input_field.name(), output_idx))
150 as Arc<dyn PhysicalExpr>;
151 let targets = vec![(target_col, output_idx)].into();
153 Ok((input_col, targets))
154 })
155 .collect::<Result<ProjectionMapping>>()?;
156
157 let input_eq_properties = input.equivalence_properties();
161 let eq_properties = input_eq_properties
162 .project(&projection_mapping, Arc::clone(schema))
163 .with_constraints(Constraints::default());
164
165 let output_partitioning = input
167 .output_partitioning()
168 .project(&projection_mapping, &eq_properties);
169
170 Ok(PlanProperties::new(
171 eq_properties,
172 output_partitioning,
173 input.pipeline_behavior(),
174 input.boundedness(),
175 ))
176 }
177
178 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
180 &self.input
181 }
182
183 pub fn list_column_indices(&self) -> &[ListUnnest] {
185 &self.list_column_indices
186 }
187
188 pub fn struct_column_indices(&self) -> &[usize] {
190 &self.struct_column_indices
191 }
192
193 pub fn options(&self) -> &UnnestOptions {
194 &self.options
195 }
196}
197
198impl DisplayAs for UnnestExec {
199 fn fmt_as(
200 &self,
201 t: DisplayFormatType,
202 f: &mut std::fmt::Formatter,
203 ) -> std::fmt::Result {
204 match t {
205 DisplayFormatType::Default | DisplayFormatType::Verbose => {
206 write!(f, "UnnestExec")
207 }
208 DisplayFormatType::TreeRender => {
209 write!(f, "")
210 }
211 }
212 }
213}
214
215impl ExecutionPlan for UnnestExec {
216 fn name(&self) -> &'static str {
217 "UnnestExec"
218 }
219
220 fn as_any(&self) -> &dyn Any {
221 self
222 }
223
224 fn properties(&self) -> &PlanProperties {
225 &self.cache
226 }
227
228 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
229 vec![&self.input]
230 }
231
232 fn with_new_children(
233 self: Arc<Self>,
234 children: Vec<Arc<dyn ExecutionPlan>>,
235 ) -> Result<Arc<dyn ExecutionPlan>> {
236 Ok(Arc::new(UnnestExec::new(
237 Arc::clone(&children[0]),
238 self.list_column_indices.clone(),
239 self.struct_column_indices.clone(),
240 Arc::clone(&self.schema),
241 self.options.clone(),
242 )?))
243 }
244
245 fn required_input_distribution(&self) -> Vec<Distribution> {
246 vec![Distribution::UnspecifiedDistribution]
247 }
248
249 fn execute(
250 &self,
251 partition: usize,
252 context: Arc<TaskContext>,
253 ) -> Result<SendableRecordBatchStream> {
254 let input = self.input.execute(partition, context)?;
255 let metrics = UnnestMetrics::new(partition, &self.metrics);
256
257 Ok(Box::pin(UnnestStream {
258 input,
259 schema: Arc::clone(&self.schema),
260 list_type_columns: self.list_column_indices.clone(),
261 struct_column_indices: self.struct_column_indices.iter().copied().collect(),
262 options: self.options.clone(),
263 metrics,
264 }))
265 }
266
267 fn metrics(&self) -> Option<MetricsSet> {
268 Some(self.metrics.clone_inner())
269 }
270}
271
272#[derive(Clone, Debug)]
273struct UnnestMetrics {
274 baseline_metrics: BaselineMetrics,
276 input_batches: metrics::Count,
278 input_rows: metrics::Count,
280}
281
282impl UnnestMetrics {
283 fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
284 let input_batches =
285 MetricBuilder::new(metrics).counter("input_batches", partition);
286
287 let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
288
289 Self {
290 baseline_metrics: BaselineMetrics::new(metrics, partition),
291 input_batches,
292 input_rows,
293 }
294 }
295}
296
297struct UnnestStream {
299 input: SendableRecordBatchStream,
301 schema: Arc<Schema>,
303 list_type_columns: Vec<ListUnnest>,
307 struct_column_indices: HashSet<usize>,
308 options: UnnestOptions,
310 metrics: UnnestMetrics,
312}
313
314impl RecordBatchStream for UnnestStream {
315 fn schema(&self) -> SchemaRef {
316 Arc::clone(&self.schema)
317 }
318}
319
320#[async_trait]
321impl Stream for UnnestStream {
322 type Item = Result<RecordBatch>;
323
324 fn poll_next(
325 mut self: std::pin::Pin<&mut Self>,
326 cx: &mut std::task::Context<'_>,
327 ) -> Poll<Option<Self::Item>> {
328 self.poll_next_impl(cx)
329 }
330}
331
332impl UnnestStream {
333 fn poll_next_impl(
336 &mut self,
337 cx: &mut std::task::Context<'_>,
338 ) -> Poll<Option<Result<RecordBatch>>> {
339 loop {
340 return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
341 Some(Ok(batch)) => {
342 let elapsed_compute =
343 self.metrics.baseline_metrics.elapsed_compute().clone();
344 let timer = elapsed_compute.timer();
345 self.metrics.input_batches.add(1);
346 self.metrics.input_rows.add(batch.num_rows());
347 let result = build_batch(
348 &batch,
349 &self.schema,
350 &self.list_type_columns,
351 &self.struct_column_indices,
352 &self.options,
353 )?;
354 timer.done();
355 let Some(result_batch) = result else {
356 continue;
357 };
358 (&result_batch).record_output(&self.metrics.baseline_metrics);
359
360 debug_assert!(result_batch.num_rows() > 0);
363 Some(Ok(result_batch))
364 }
365 other => {
366 trace!(
367 "Processed {} probe-side input batches containing {} rows and \
368 produced {} output batches containing {} rows in {}",
369 self.metrics.input_batches,
370 self.metrics.input_rows,
371 self.metrics.baseline_metrics.output_batches(),
372 self.metrics.baseline_metrics.output_rows(),
373 self.metrics.baseline_metrics.elapsed_compute(),
374 );
375 other
376 }
377 });
378 }
379 }
380}
381
382fn flatten_struct_cols(
393 input_batch: &[Arc<dyn Array>],
394 schema: &SchemaRef,
395 struct_column_indices: &HashSet<usize>,
396) -> Result<RecordBatch> {
397 let columns_expanded = input_batch
399 .iter()
400 .enumerate()
401 .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
402 Some(_) => match column_data.data_type() {
403 DataType::Struct(_) => {
404 let struct_arr =
405 column_data.as_any().downcast_ref::<StructArray>().unwrap();
406 Ok(struct_arr.columns().to_vec())
407 }
408 data_type => internal_err!(
409 "expecting column {} from input plan to be a struct, got {:?}",
410 idx,
411 data_type
412 ),
413 },
414 None => Ok(vec![Arc::clone(column_data)]),
415 })
416 .collect::<Result<Vec<_>>>()?
417 .into_iter()
418 .flatten()
419 .collect();
420 Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
421}
422
423#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
424pub struct ListUnnest {
425 pub index_in_input_schema: usize,
426 pub depth: usize,
427}
428
429fn list_unnest_at_level(
468 batch: &[ArrayRef],
469 list_type_unnests: &[ListUnnest],
470 temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
471 level_to_unnest: usize,
472 options: &UnnestOptions,
473) -> Result<Option<Vec<ArrayRef>>> {
474 let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
476 list_type_unnests
477 .iter()
478 .filter_map(|unnesting| {
479 if level_to_unnest == unnesting.depth {
480 return Some((
481 Arc::clone(&batch[unnesting.index_in_input_schema]),
482 *unnesting,
483 ));
484 }
485 if level_to_unnest < unnesting.depth {
488 return Some((
489 Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
490 *unnesting,
491 ));
492 }
493 None
494 })
495 .unzip();
496
497 let longest_length = find_longest_length(&arrs_to_unnest, options)?;
500 let unnested_length = longest_length.as_primitive::<Int64Type>();
501 let total_length = if unnested_length.is_empty() {
502 0
503 } else {
504 sum(unnested_length).ok_or_else(|| {
505 exec_datafusion_err!("Failed to calculate the total unnested length")
506 })? as usize
507 };
508 if total_length == 0 {
509 return Ok(None);
510 }
511
512 let unnested_temp_arrays =
514 unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
515
516 let take_indices = create_take_indices(unnested_length, total_length);
518 unnested_temp_arrays
519 .into_iter()
520 .zip(list_unnest_specs.iter())
521 .for_each(|(flatten_arr, unnesting)| {
522 temp_unnested_arrs.insert(*unnesting, flatten_arr);
523 });
524
525 let repeat_mask: Vec<bool> = batch
526 .iter()
527 .enumerate()
528 .map(|(i, _)| {
529 let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
531 unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
532 });
533
534 let is_involved_in_unnesting = list_type_unnests
536 .iter()
537 .any(|unnesting| unnesting.index_in_input_schema == i);
538
539 needed_in_future_levels || !is_involved_in_unnesting
541 })
542 .collect();
543
544 let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
547
548 Ok(Some(ret))
549}
550struct UnnestingResult {
551 arr: ArrayRef,
552 depth: usize,
553}
554
555fn build_batch(
612 batch: &RecordBatch,
613 schema: &SchemaRef,
614 list_type_columns: &[ListUnnest],
615 struct_column_indices: &HashSet<usize>,
616 options: &UnnestOptions,
617) -> Result<Option<RecordBatch>> {
618 let transformed = match list_type_columns.len() {
619 0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
620 _ => {
621 let mut temp_unnested_result = HashMap::new();
622 let max_recursion = list_type_columns
623 .iter()
624 .fold(0, |highest_depth, ListUnnest { depth, .. }| {
625 cmp::max(highest_depth, *depth)
626 });
627
628 let mut flatten_arrs = vec![];
630
631 for depth in (1..=max_recursion).rev() {
634 let input = match depth == max_recursion {
635 true => batch.columns(),
636 false => &flatten_arrs,
637 };
638 let Some(temp_result) = list_unnest_at_level(
639 input,
640 list_type_columns,
641 &mut temp_unnested_result,
642 depth,
643 options,
644 )?
645 else {
646 return Ok(None);
647 };
648 flatten_arrs = temp_result;
649 }
650 let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
651 temp_unnested_result.into_iter().fold(
652 HashMap::new(),
653 |mut acc,
654 (
655 ListUnnest {
656 index_in_input_schema,
657 depth,
658 },
659 flattened_array,
660 )| {
661 acc.entry(index_in_input_schema).or_default().push(
662 UnnestingResult {
663 arr: flattened_array,
664 depth,
665 },
666 );
667 acc
668 },
669 );
670 let output_order: HashMap<ListUnnest, usize> = list_type_columns
671 .iter()
672 .enumerate()
673 .map(|(order, unnest_def)| (*unnest_def, order))
674 .collect();
675
676 let mut multi_unnested_per_original_index = unnested_array_map
678 .into_iter()
679 .map(
680 |(original_index, mut unnested_columns)| {
684 unnested_columns.sort_by(
685 |UnnestingResult { depth: depth1, .. },
686 UnnestingResult { depth: depth2, .. }|
687 -> Ordering {
688 output_order
689 .get(&ListUnnest {
690 depth: *depth1,
691 index_in_input_schema: original_index,
692 })
693 .unwrap()
694 .cmp(
695 output_order
696 .get(&ListUnnest {
697 depth: *depth2,
698 index_in_input_schema: original_index,
699 })
700 .unwrap(),
701 )
702 },
703 );
704 (
705 original_index,
706 unnested_columns
707 .into_iter()
708 .map(|result| result.arr)
709 .collect::<Vec<_>>(),
710 )
711 },
712 )
713 .collect::<HashMap<_, _>>();
714
715 let ret = flatten_arrs
716 .into_iter()
717 .enumerate()
718 .flat_map(|(col_idx, arr)| {
719 match multi_unnested_per_original_index.remove(&col_idx) {
723 Some(unnested_arrays) => unnested_arrays,
724 None => vec![arr],
725 }
726 })
727 .collect::<Vec<_>>();
728
729 flatten_struct_cols(&ret, schema, struct_column_indices)
730 }
731 }?;
732 Ok(Some(transformed))
733}
734
735fn find_longest_length(
757 list_arrays: &[ArrayRef],
758 options: &UnnestOptions,
759) -> Result<ArrayRef> {
760 let null_length = if options.preserve_nulls {
762 Scalar::new(Int64Array::from_value(1, 1))
763 } else {
764 Scalar::new(Int64Array::from_value(0, 1))
765 };
766 let list_lengths: Vec<ArrayRef> = list_arrays
767 .iter()
768 .map(|list_array| {
769 let mut length_array = length(list_array)?;
770 length_array = cast(&length_array, &DataType::Int64)?;
772 length_array =
773 zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
774 Ok(length_array)
775 })
776 .collect::<Result<_>>()?;
777
778 let longest_length = list_lengths.iter().skip(1).try_fold(
779 Arc::clone(&list_lengths[0]),
780 |longest, current| {
781 let is_lt = lt(&longest, ¤t)?;
782 zip(&is_lt, ¤t, &longest)
783 },
784 )?;
785 Ok(longest_length)
786}
787
788trait ListArrayType: Array {
790 fn values(&self) -> &ArrayRef;
792
793 fn value_offsets(&self, row: usize) -> (i64, i64);
795}
796
797impl ListArrayType for ListArray {
798 fn values(&self) -> &ArrayRef {
799 self.values()
800 }
801
802 fn value_offsets(&self, row: usize) -> (i64, i64) {
803 let offsets = self.value_offsets();
804 (offsets[row].into(), offsets[row + 1].into())
805 }
806}
807
808impl ListArrayType for LargeListArray {
809 fn values(&self) -> &ArrayRef {
810 self.values()
811 }
812
813 fn value_offsets(&self, row: usize) -> (i64, i64) {
814 let offsets = self.value_offsets();
815 (offsets[row], offsets[row + 1])
816 }
817}
818
819impl ListArrayType for FixedSizeListArray {
820 fn values(&self) -> &ArrayRef {
821 self.values()
822 }
823
824 fn value_offsets(&self, row: usize) -> (i64, i64) {
825 let start = self.value_offset(row) as i64;
826 (start, start + self.value_length() as i64)
827 }
828}
829
830fn unnest_list_arrays(
832 list_arrays: &[ArrayRef],
833 length_array: &PrimitiveArray<Int64Type>,
834 capacity: usize,
835) -> Result<Vec<ArrayRef>> {
836 let typed_arrays = list_arrays
837 .iter()
838 .map(|list_array| match list_array.data_type() {
839 DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
840 DataType::LargeList(_) => {
841 Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
842 }
843 DataType::FixedSizeList(_, _) => {
844 Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
845 }
846 other => exec_err!("Invalid unnest datatype {other }"),
847 })
848 .collect::<Result<Vec<_>>>()?;
849
850 typed_arrays
851 .iter()
852 .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
853 .collect::<Result<_>>()
854}
855
856fn unnest_list_array(
877 list_array: &dyn ListArrayType,
878 length_array: &PrimitiveArray<Int64Type>,
879 capacity: usize,
880) -> Result<ArrayRef> {
881 let values = list_array.values();
882 let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
883 for row in 0..list_array.len() {
884 let mut value_length = 0;
885 if !list_array.is_null(row) {
886 let (start, end) = list_array.value_offsets(row);
887 value_length = end - start;
888 for i in start..end {
889 take_indices_builder.append_value(i)
890 }
891 }
892 let target_length = length_array.value(row);
893 debug_assert!(
894 value_length <= target_length,
895 "value length is beyond the longest length"
896 );
897 for _ in value_length..target_length {
899 take_indices_builder.append_null();
900 }
901 }
902 Ok(kernels::take::take(
903 &values,
904 &take_indices_builder.finish(),
905 None,
906 )?)
907}
908
909fn create_take_indices(
924 length_array: &PrimitiveArray<Int64Type>,
925 capacity: usize,
926) -> PrimitiveArray<Int64Type> {
927 debug_assert!(
929 length_array.null_count() == 0,
930 "length array should not contain nulls"
931 );
932 let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
933 for (index, repeat) in length_array.iter().enumerate() {
934 let repeat = repeat.unwrap();
936 (0..repeat).for_each(|_| builder.append_value(index as i64));
937 }
938 builder.finish()
939}
940
941fn repeat_arrs_from_indices(
988 batch: &[ArrayRef],
989 indices: &PrimitiveArray<Int64Type>,
990 repeat_mask: &[bool],
991) -> Result<Vec<Arc<dyn Array>>> {
992 batch
993 .iter()
994 .zip(repeat_mask.iter())
995 .map(|(arr, &repeat)| {
996 if repeat {
997 Ok(kernels::take::take(arr, indices, None)?)
998 } else {
999 Ok(new_null_array(arr.data_type(), arr.len()))
1000 }
1001 })
1002 .collect()
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use super::*;
1008 use arrow::array::{
1009 GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
1010 };
1011 use arrow::buffer::{NullBuffer, OffsetBuffer};
1012 use arrow::datatypes::{Field, Int32Type};
1013 use datafusion_common::test_util::batches_to_string;
1014 use insta::assert_snapshot;
1015
1016 fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
1019 where
1020 OffsetSize: OffsetSizeTrait,
1021 {
1022 let mut values = vec![];
1023 let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
1024 let mut valid = NullBufferBuilder::new(6);
1025
1026 values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
1028 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1029 valid.append_non_null();
1030
1031 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1033 valid.append_non_null();
1034
1035 values.push(Some("?"));
1038 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1039 valid.append_null();
1040
1041 values.push(Some("D"));
1043 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1044 valid.append_non_null();
1045
1046 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1048 valid.append_null();
1049
1050 values.extend_from_slice(&[None, Some("F")]);
1052 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1053 valid.append_non_null();
1054
1055 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1056 GenericListArray::<OffsetSize>::new(
1057 field,
1058 OffsetBuffer::new(offsets.into()),
1059 Arc::new(StringArray::from(values)),
1060 valid.finish(),
1061 )
1062 }
1063
1064 fn make_fixed_list() -> FixedSizeListArray {
1067 let values = Arc::new(StringArray::from_iter([
1068 Some("A"),
1069 Some("B"),
1070 None,
1071 None,
1072 Some("C"),
1073 Some("D"),
1074 None,
1075 None,
1076 None,
1077 Some("F"),
1078 None,
1079 None,
1080 ]));
1081 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1082 let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1083 FixedSizeListArray::new(field, 2, values, Some(valid))
1084 }
1085
1086 fn verify_unnest_list_array(
1087 list_array: &dyn ListArrayType,
1088 lengths: Vec<i64>,
1089 expected: Vec<Option<&str>>,
1090 ) -> Result<()> {
1091 let length_array = Int64Array::from(lengths);
1092 let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1093 let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1094 assert_eq!(strs, expected);
1095 Ok(())
1096 }
1097
1098 #[test]
1099 fn test_build_batch_list_arr_recursive() -> Result<()> {
1100 let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1105 Some(vec![Some(1), Some(2), Some(3)]),
1106 None,
1107 Some(vec![Some(4), Some(5)]),
1108 Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1109 None,
1110 Some(vec![Some(11), Some(12), Some(13)]),
1111 ]);
1112
1113 let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1114 let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1115 let mut nulls = NullBufferBuilder::new(3);
1116 nulls.append_non_null();
1117 nulls.append_non_null();
1118 nulls.append_null();
1119 let col1_field = Field::new_list_field(
1121 DataType::List(Arc::new(Field::new_list_field(
1122 list_arr1_ref.data_type().to_owned(),
1123 true,
1124 ))),
1125 true,
1126 );
1127 let col1 = ListArray::new(
1128 Arc::new(Field::new_list_field(
1129 list_arr1_ref.data_type().to_owned(),
1130 true,
1131 )),
1132 offsets,
1133 list_arr1_ref,
1134 nulls.finish(),
1135 );
1136
1137 let list_arr2 = StringArray::from(vec![
1138 Some("a"),
1139 Some("b"),
1140 Some("c"),
1141 Some("d"),
1142 Some("e"),
1143 ]);
1144
1145 let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1146 let mut nulls = NullBufferBuilder::new(3);
1147 nulls.append_n_non_nulls(3);
1148 let col2_field = Field::new(
1149 "col2",
1150 DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1151 true,
1152 );
1153 let col2 = GenericListArray::<i32>::new(
1154 Arc::new(Field::new_list_field(DataType::Utf8, true)),
1155 OffsetBuffer::new(offsets.into()),
1156 Arc::new(list_arr2),
1157 nulls.finish(),
1158 );
1159 let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1161 let out_schema = Arc::new(Schema::new(vec![
1162 Field::new(
1163 "col1_unnest_placeholder_depth_1",
1164 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1165 true,
1166 ),
1167 Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1168 Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1169 ]));
1170 let batch = RecordBatch::try_new(
1171 Arc::clone(&schema),
1172 vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1173 )
1174 .unwrap();
1175 let list_type_columns = vec![
1176 ListUnnest {
1177 index_in_input_schema: 0,
1178 depth: 1,
1179 },
1180 ListUnnest {
1181 index_in_input_schema: 0,
1182 depth: 2,
1183 },
1184 ListUnnest {
1185 index_in_input_schema: 1,
1186 depth: 1,
1187 },
1188 ];
1189 let ret = build_batch(
1190 &batch,
1191 &out_schema,
1192 list_type_columns.as_ref(),
1193 &HashSet::default(),
1194 &UnnestOptions {
1195 preserve_nulls: true,
1196 recursions: vec![],
1197 },
1198 )?
1199 .unwrap();
1200
1201 assert_snapshot!(batches_to_string(&[ret]),
1202 @r"
1203 +---------------------------------+---------------------------------+---------------------------------+
1204 | col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |
1205 +---------------------------------+---------------------------------+---------------------------------+
1206 | [1, 2, 3] | 1 | a |
1207 | | 2 | b |
1208 | [4, 5] | 3 | |
1209 | [1, 2, 3] | | a |
1210 | | | b |
1211 | [4, 5] | | |
1212 | [1, 2, 3] | 4 | a |
1213 | | 5 | b |
1214 | [4, 5] | | |
1215 | [7, 8, 9, 10] | 7 | c |
1216 | | 8 | d |
1217 | [11, 12, 13] | 9 | |
1218 | | 10 | |
1219 | [7, 8, 9, 10] | | c |
1220 | | | d |
1221 | [11, 12, 13] | | |
1222 | [7, 8, 9, 10] | 11 | c |
1223 | | 12 | d |
1224 | [11, 12, 13] | 13 | |
1225 | | | e |
1226 +---------------------------------+---------------------------------+---------------------------------+
1227 ");
1228 Ok(())
1229 }
1230
1231 #[test]
1232 fn test_unnest_list_array() -> Result<()> {
1233 let list_array = make_generic_array::<i32>();
1235 verify_unnest_list_array(
1236 &list_array,
1237 vec![3, 2, 1, 2, 0, 3],
1238 vec![
1239 Some("A"),
1240 Some("B"),
1241 Some("C"),
1242 None,
1243 None,
1244 None,
1245 Some("D"),
1246 None,
1247 None,
1248 Some("F"),
1249 None,
1250 ],
1251 )?;
1252
1253 let list_array = make_fixed_list();
1255 verify_unnest_list_array(
1256 &list_array,
1257 vec![3, 1, 2, 0, 2, 3],
1258 vec![
1259 Some("A"),
1260 Some("B"),
1261 None,
1262 None,
1263 Some("C"),
1264 Some("D"),
1265 None,
1266 Some("F"),
1267 None,
1268 None,
1269 None,
1270 ],
1271 )?;
1272
1273 Ok(())
1274 }
1275
1276 fn verify_longest_length(
1277 list_arrays: &[ArrayRef],
1278 preserve_nulls: bool,
1279 expected: Vec<i64>,
1280 ) -> Result<()> {
1281 let options = UnnestOptions {
1282 preserve_nulls,
1283 recursions: vec![],
1284 };
1285 let longest_length = find_longest_length(list_arrays, &options)?;
1286 let expected_array = Int64Array::from(expected);
1287 assert_eq!(
1288 longest_length
1289 .as_any()
1290 .downcast_ref::<Int64Array>()
1291 .unwrap(),
1292 &expected_array
1293 );
1294 Ok(())
1295 }
1296
1297 #[test]
1298 fn test_longest_list_length() -> Result<()> {
1299 let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1302 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1303 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1304
1305 let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1308 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1309 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1310
1311 let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1314 verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1315 verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1316
1317 let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1321 let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1322 let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1323 verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1324 verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1325
1326 Ok(())
1327 }
1328
1329 #[test]
1330 fn test_create_take_indices() -> Result<()> {
1331 let length_array = Int64Array::from(vec![2, 3, 1]);
1332 let take_indices = create_take_indices(&length_array, 6);
1333 let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1334 assert_eq!(take_indices, expected);
1335 Ok(())
1336 }
1337}