1use std::cmp::{self, Ordering};
21use std::task::{ready, Poll};
22use std::{any::Any, sync::Arc};
23
24use super::metrics::{
25 self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
26 RecordOutput,
27};
28use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
29use crate::{
30 DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
31 SendableRecordBatchStream,
32};
33
34use arrow::array::{
35 new_null_array, Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray,
36 Int64Array, LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray,
37};
38use arrow::compute::kernels::length::length;
39use arrow::compute::kernels::zip::zip;
40use arrow::compute::{cast, is_not_null, kernels, sum};
41use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
42use arrow::record_batch::RecordBatch;
43use arrow_ord::cmp::lt;
44use async_trait::async_trait;
45use datafusion_common::{
46 exec_datafusion_err, exec_err, internal_err, Constraints, HashMap, HashSet, Result,
47 UnnestOptions,
48};
49use datafusion_execution::TaskContext;
50use datafusion_physical_expr::equivalence::ProjectionMapping;
51use datafusion_physical_expr::expressions::Column;
52use datafusion_physical_expr::PhysicalExpr;
53use futures::{Stream, StreamExt};
54use log::trace;
55
56#[derive(Debug, Clone)]
63pub struct UnnestExec {
64 input: Arc<dyn ExecutionPlan>,
66 schema: SchemaRef,
68 list_column_indices: Vec<ListUnnest>,
70 struct_column_indices: Vec<usize>,
72 options: UnnestOptions,
74 metrics: ExecutionPlanMetricsSet,
76 cache: PlanProperties,
78}
79
80impl UnnestExec {
81 pub fn new(
83 input: Arc<dyn ExecutionPlan>,
84 list_column_indices: Vec<ListUnnest>,
85 struct_column_indices: Vec<usize>,
86 schema: SchemaRef,
87 options: UnnestOptions,
88 ) -> Result<Self> {
89 let cache = Self::compute_properties(
90 &input,
91 &list_column_indices,
92 &struct_column_indices,
93 Arc::clone(&schema),
94 )?;
95
96 Ok(UnnestExec {
97 input,
98 schema,
99 list_column_indices,
100 struct_column_indices,
101 options,
102 metrics: Default::default(),
103 cache,
104 })
105 }
106
107 fn compute_properties(
109 input: &Arc<dyn ExecutionPlan>,
110 list_column_indices: &[ListUnnest],
111 struct_column_indices: &[usize],
112 schema: SchemaRef,
113 ) -> Result<PlanProperties> {
114 let input_schema = input.schema();
116 let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len());
117 unnested_indices.append_n(input_schema.fields().len(), false);
118 for list_unnest in list_column_indices {
119 unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
120 }
121 for struct_unnest in struct_column_indices {
122 unnested_indices.set_bit(*struct_unnest, true)
123 }
124 let unnested_indices = unnested_indices.finish();
125 let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
126 .filter(|idx| !unnested_indices.value(*idx))
127 .collect();
128
129 let input_schema = input.schema();
131 let projection_mapping: ProjectionMapping = non_unnested_indices
132 .iter()
133 .map(|&input_idx| {
134 let input_field = input_schema.field(input_idx);
136 let output_idx = schema
137 .fields()
138 .iter()
139 .position(|output_field| output_field.name() == input_field.name())
140 .ok_or_else(|| {
141 exec_datafusion_err!(
142 "Non-unnested column '{}' must exist in output schema",
143 input_field.name()
144 )
145 })?;
146
147 let input_col = Arc::new(Column::new(input_field.name(), input_idx))
148 as Arc<dyn PhysicalExpr>;
149 let target_col = Arc::new(Column::new(input_field.name(), output_idx))
150 as Arc<dyn PhysicalExpr>;
151 let targets = vec![(target_col, output_idx)].into();
153 Ok((input_col, targets))
154 })
155 .collect::<Result<ProjectionMapping>>()?;
156
157 let input_eq_properties = input.equivalence_properties();
161 let eq_properties = input_eq_properties
162 .project(&projection_mapping, Arc::clone(&schema))
163 .with_constraints(Constraints::default());
164
165 let output_partitioning = input
167 .output_partitioning()
168 .project(&projection_mapping, &eq_properties);
169
170 Ok(PlanProperties::new(
171 eq_properties,
172 output_partitioning,
173 input.pipeline_behavior(),
174 input.boundedness(),
175 ))
176 }
177
178 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
180 &self.input
181 }
182
183 pub fn list_column_indices(&self) -> &[ListUnnest] {
185 &self.list_column_indices
186 }
187
188 pub fn struct_column_indices(&self) -> &[usize] {
190 &self.struct_column_indices
191 }
192
193 pub fn options(&self) -> &UnnestOptions {
194 &self.options
195 }
196}
197
198impl DisplayAs for UnnestExec {
199 fn fmt_as(
200 &self,
201 t: DisplayFormatType,
202 f: &mut std::fmt::Formatter,
203 ) -> std::fmt::Result {
204 match t {
205 DisplayFormatType::Default | DisplayFormatType::Verbose => {
206 write!(f, "UnnestExec")
207 }
208 DisplayFormatType::TreeRender => {
209 write!(f, "")
210 }
211 }
212 }
213}
214
215impl ExecutionPlan for UnnestExec {
216 fn name(&self) -> &'static str {
217 "UnnestExec"
218 }
219
220 fn as_any(&self) -> &dyn Any {
221 self
222 }
223
224 fn properties(&self) -> &PlanProperties {
225 &self.cache
226 }
227
228 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
229 vec![&self.input]
230 }
231
232 fn with_new_children(
233 self: Arc<Self>,
234 children: Vec<Arc<dyn ExecutionPlan>>,
235 ) -> Result<Arc<dyn ExecutionPlan>> {
236 Ok(Arc::new(UnnestExec::new(
237 Arc::clone(&children[0]),
238 self.list_column_indices.clone(),
239 self.struct_column_indices.clone(),
240 Arc::clone(&self.schema),
241 self.options.clone(),
242 )?))
243 }
244
245 fn required_input_distribution(&self) -> Vec<Distribution> {
246 vec![Distribution::UnspecifiedDistribution]
247 }
248
249 fn execute(
250 &self,
251 partition: usize,
252 context: Arc<TaskContext>,
253 ) -> Result<SendableRecordBatchStream> {
254 let input = self.input.execute(partition, context)?;
255 let metrics = UnnestMetrics::new(partition, &self.metrics);
256
257 Ok(Box::pin(UnnestStream {
258 input,
259 schema: Arc::clone(&self.schema),
260 list_type_columns: self.list_column_indices.clone(),
261 struct_column_indices: self.struct_column_indices.iter().copied().collect(),
262 options: self.options.clone(),
263 metrics,
264 }))
265 }
266
267 fn metrics(&self) -> Option<MetricsSet> {
268 Some(self.metrics.clone_inner())
269 }
270}
271
272#[derive(Clone, Debug)]
273struct UnnestMetrics {
274 baseline_metrics: BaselineMetrics,
276 input_batches: metrics::Count,
278 input_rows: metrics::Count,
280 output_batches: metrics::Count,
282}
283
284impl UnnestMetrics {
285 fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
286 let input_batches =
287 MetricBuilder::new(metrics).counter("input_batches", partition);
288
289 let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
290
291 let output_batches =
292 MetricBuilder::new(metrics).counter("output_batches", partition);
293
294 Self {
295 baseline_metrics: BaselineMetrics::new(metrics, partition),
296 input_batches,
297 input_rows,
298 output_batches,
299 }
300 }
301}
302
303struct UnnestStream {
305 input: SendableRecordBatchStream,
307 schema: Arc<Schema>,
309 list_type_columns: Vec<ListUnnest>,
313 struct_column_indices: HashSet<usize>,
314 options: UnnestOptions,
316 metrics: UnnestMetrics,
318}
319
320impl RecordBatchStream for UnnestStream {
321 fn schema(&self) -> SchemaRef {
322 Arc::clone(&self.schema)
323 }
324}
325
326#[async_trait]
327impl Stream for UnnestStream {
328 type Item = Result<RecordBatch>;
329
330 fn poll_next(
331 mut self: std::pin::Pin<&mut Self>,
332 cx: &mut std::task::Context<'_>,
333 ) -> Poll<Option<Self::Item>> {
334 self.poll_next_impl(cx)
335 }
336}
337
338impl UnnestStream {
339 fn poll_next_impl(
342 &mut self,
343 cx: &mut std::task::Context<'_>,
344 ) -> Poll<Option<Result<RecordBatch>>> {
345 loop {
346 return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
347 Some(Ok(batch)) => {
348 let elapsed_compute =
349 self.metrics.baseline_metrics.elapsed_compute().clone();
350 let timer = elapsed_compute.timer();
351 self.metrics.input_batches.add(1);
352 self.metrics.input_rows.add(batch.num_rows());
353 let result = build_batch(
354 &batch,
355 &self.schema,
356 &self.list_type_columns,
357 &self.struct_column_indices,
358 &self.options,
359 )?;
360 timer.done();
361 let Some(result_batch) = result else {
362 continue;
363 };
364 self.metrics.output_batches.add(1);
365 (&result_batch).record_output(&self.metrics.baseline_metrics);
366
367 debug_assert!(result_batch.num_rows() > 0);
370 Some(Ok(result_batch))
371 }
372 other => {
373 trace!(
374 "Processed {} probe-side input batches containing {} rows and \
375 produced {} output batches containing {} rows in {}",
376 self.metrics.input_batches,
377 self.metrics.input_rows,
378 self.metrics.output_batches,
379 self.metrics.baseline_metrics.output_rows(),
380 self.metrics.baseline_metrics.elapsed_compute(),
381 );
382 other
383 }
384 });
385 }
386 }
387}
388
389fn flatten_struct_cols(
400 input_batch: &[Arc<dyn Array>],
401 schema: &SchemaRef,
402 struct_column_indices: &HashSet<usize>,
403) -> Result<RecordBatch> {
404 let columns_expanded = input_batch
406 .iter()
407 .enumerate()
408 .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
409 Some(_) => match column_data.data_type() {
410 DataType::Struct(_) => {
411 let struct_arr =
412 column_data.as_any().downcast_ref::<StructArray>().unwrap();
413 Ok(struct_arr.columns().to_vec())
414 }
415 data_type => internal_err!(
416 "expecting column {} from input plan to be a struct, got {:?}",
417 idx,
418 data_type
419 ),
420 },
421 None => Ok(vec![Arc::clone(column_data)]),
422 })
423 .collect::<Result<Vec<_>>>()?
424 .into_iter()
425 .flatten()
426 .collect();
427 Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
428}
429
430#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
431pub struct ListUnnest {
432 pub index_in_input_schema: usize,
433 pub depth: usize,
434}
435
436fn list_unnest_at_level(
475 batch: &[ArrayRef],
476 list_type_unnests: &[ListUnnest],
477 temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
478 level_to_unnest: usize,
479 options: &UnnestOptions,
480) -> Result<Option<Vec<ArrayRef>>> {
481 let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
483 list_type_unnests
484 .iter()
485 .filter_map(|unnesting| {
486 if level_to_unnest == unnesting.depth {
487 return Some((
488 Arc::clone(&batch[unnesting.index_in_input_schema]),
489 *unnesting,
490 ));
491 }
492 if level_to_unnest < unnesting.depth {
495 return Some((
496 Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
497 *unnesting,
498 ));
499 }
500 None
501 })
502 .unzip();
503
504 let longest_length = find_longest_length(&arrs_to_unnest, options)?;
507 let unnested_length = longest_length.as_primitive::<Int64Type>();
508 let total_length = if unnested_length.is_empty() {
509 0
510 } else {
511 sum(unnested_length).ok_or_else(|| {
512 exec_datafusion_err!("Failed to calculate the total unnested length")
513 })? as usize
514 };
515 if total_length == 0 {
516 return Ok(None);
517 }
518
519 let unnested_temp_arrays =
521 unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
522
523 let take_indices = create_take_indices(unnested_length, total_length);
525 unnested_temp_arrays
526 .into_iter()
527 .zip(list_unnest_specs.iter())
528 .for_each(|(flatten_arr, unnesting)| {
529 temp_unnested_arrs.insert(*unnesting, flatten_arr);
530 });
531
532 let repeat_mask: Vec<bool> = batch
533 .iter()
534 .enumerate()
535 .map(|(i, _)| {
536 let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
538 unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
539 });
540
541 let is_involved_in_unnesting = list_type_unnests
543 .iter()
544 .any(|unnesting| unnesting.index_in_input_schema == i);
545
546 needed_in_future_levels || !is_involved_in_unnesting
548 })
549 .collect();
550
551 let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
554
555 Ok(Some(ret))
556}
557struct UnnestingResult {
558 arr: ArrayRef,
559 depth: usize,
560}
561
562fn build_batch(
619 batch: &RecordBatch,
620 schema: &SchemaRef,
621 list_type_columns: &[ListUnnest],
622 struct_column_indices: &HashSet<usize>,
623 options: &UnnestOptions,
624) -> Result<Option<RecordBatch>> {
625 let transformed = match list_type_columns.len() {
626 0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
627 _ => {
628 let mut temp_unnested_result = HashMap::new();
629 let max_recursion = list_type_columns
630 .iter()
631 .fold(0, |highest_depth, ListUnnest { depth, .. }| {
632 cmp::max(highest_depth, *depth)
633 });
634
635 let mut flatten_arrs = vec![];
637
638 for depth in (1..=max_recursion).rev() {
641 let input = match depth == max_recursion {
642 true => batch.columns(),
643 false => &flatten_arrs,
644 };
645 let Some(temp_result) = list_unnest_at_level(
646 input,
647 list_type_columns,
648 &mut temp_unnested_result,
649 depth,
650 options,
651 )?
652 else {
653 return Ok(None);
654 };
655 flatten_arrs = temp_result;
656 }
657 let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
658 temp_unnested_result.into_iter().fold(
659 HashMap::new(),
660 |mut acc,
661 (
662 ListUnnest {
663 index_in_input_schema,
664 depth,
665 },
666 flattened_array,
667 )| {
668 acc.entry(index_in_input_schema).or_default().push(
669 UnnestingResult {
670 arr: flattened_array,
671 depth,
672 },
673 );
674 acc
675 },
676 );
677 let output_order: HashMap<ListUnnest, usize> = list_type_columns
678 .iter()
679 .enumerate()
680 .map(|(order, unnest_def)| (*unnest_def, order))
681 .collect();
682
683 let mut multi_unnested_per_original_index = unnested_array_map
685 .into_iter()
686 .map(
687 |(original_index, mut unnested_columns)| {
691 unnested_columns.sort_by(
692 |UnnestingResult { depth: depth1, .. },
693 UnnestingResult { depth: depth2, .. }|
694 -> Ordering {
695 output_order
696 .get(&ListUnnest {
697 depth: *depth1,
698 index_in_input_schema: original_index,
699 })
700 .unwrap()
701 .cmp(
702 output_order
703 .get(&ListUnnest {
704 depth: *depth2,
705 index_in_input_schema: original_index,
706 })
707 .unwrap(),
708 )
709 },
710 );
711 (
712 original_index,
713 unnested_columns
714 .into_iter()
715 .map(|result| result.arr)
716 .collect::<Vec<_>>(),
717 )
718 },
719 )
720 .collect::<HashMap<_, _>>();
721
722 let ret = flatten_arrs
723 .into_iter()
724 .enumerate()
725 .flat_map(|(col_idx, arr)| {
726 match multi_unnested_per_original_index.remove(&col_idx) {
730 Some(unnested_arrays) => unnested_arrays,
731 None => vec![arr],
732 }
733 })
734 .collect::<Vec<_>>();
735
736 flatten_struct_cols(&ret, schema, struct_column_indices)
737 }
738 }?;
739 Ok(Some(transformed))
740}
741
742fn find_longest_length(
764 list_arrays: &[ArrayRef],
765 options: &UnnestOptions,
766) -> Result<ArrayRef> {
767 let null_length = if options.preserve_nulls {
769 Scalar::new(Int64Array::from_value(1, 1))
770 } else {
771 Scalar::new(Int64Array::from_value(0, 1))
772 };
773 let list_lengths: Vec<ArrayRef> = list_arrays
774 .iter()
775 .map(|list_array| {
776 let mut length_array = length(list_array)?;
777 length_array = cast(&length_array, &DataType::Int64)?;
779 length_array =
780 zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
781 Ok(length_array)
782 })
783 .collect::<Result<_>>()?;
784
785 let longest_length = list_lengths.iter().skip(1).try_fold(
786 Arc::clone(&list_lengths[0]),
787 |longest, current| {
788 let is_lt = lt(&longest, ¤t)?;
789 zip(&is_lt, ¤t, &longest)
790 },
791 )?;
792 Ok(longest_length)
793}
794
795trait ListArrayType: Array {
797 fn values(&self) -> &ArrayRef;
799
800 fn value_offsets(&self, row: usize) -> (i64, i64);
802}
803
804impl ListArrayType for ListArray {
805 fn values(&self) -> &ArrayRef {
806 self.values()
807 }
808
809 fn value_offsets(&self, row: usize) -> (i64, i64) {
810 let offsets = self.value_offsets();
811 (offsets[row].into(), offsets[row + 1].into())
812 }
813}
814
815impl ListArrayType for LargeListArray {
816 fn values(&self) -> &ArrayRef {
817 self.values()
818 }
819
820 fn value_offsets(&self, row: usize) -> (i64, i64) {
821 let offsets = self.value_offsets();
822 (offsets[row], offsets[row + 1])
823 }
824}
825
826impl ListArrayType for FixedSizeListArray {
827 fn values(&self) -> &ArrayRef {
828 self.values()
829 }
830
831 fn value_offsets(&self, row: usize) -> (i64, i64) {
832 let start = self.value_offset(row) as i64;
833 (start, start + self.value_length() as i64)
834 }
835}
836
837fn unnest_list_arrays(
839 list_arrays: &[ArrayRef],
840 length_array: &PrimitiveArray<Int64Type>,
841 capacity: usize,
842) -> Result<Vec<ArrayRef>> {
843 let typed_arrays = list_arrays
844 .iter()
845 .map(|list_array| match list_array.data_type() {
846 DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
847 DataType::LargeList(_) => {
848 Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
849 }
850 DataType::FixedSizeList(_, _) => {
851 Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
852 }
853 other => exec_err!("Invalid unnest datatype {other }"),
854 })
855 .collect::<Result<Vec<_>>>()?;
856
857 typed_arrays
858 .iter()
859 .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
860 .collect::<Result<_>>()
861}
862
863fn unnest_list_array(
884 list_array: &dyn ListArrayType,
885 length_array: &PrimitiveArray<Int64Type>,
886 capacity: usize,
887) -> Result<ArrayRef> {
888 let values = list_array.values();
889 let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
890 for row in 0..list_array.len() {
891 let mut value_length = 0;
892 if !list_array.is_null(row) {
893 let (start, end) = list_array.value_offsets(row);
894 value_length = end - start;
895 for i in start..end {
896 take_indices_builder.append_value(i)
897 }
898 }
899 let target_length = length_array.value(row);
900 debug_assert!(
901 value_length <= target_length,
902 "value length is beyond the longest length"
903 );
904 for _ in value_length..target_length {
906 take_indices_builder.append_null();
907 }
908 }
909 Ok(kernels::take::take(
910 &values,
911 &take_indices_builder.finish(),
912 None,
913 )?)
914}
915
916fn create_take_indices(
931 length_array: &PrimitiveArray<Int64Type>,
932 capacity: usize,
933) -> PrimitiveArray<Int64Type> {
934 debug_assert!(
936 length_array.null_count() == 0,
937 "length array should not contain nulls"
938 );
939 let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
940 for (index, repeat) in length_array.iter().enumerate() {
941 let repeat = repeat.unwrap();
943 (0..repeat).for_each(|_| builder.append_value(index as i64));
944 }
945 builder.finish()
946}
947
948fn repeat_arrs_from_indices(
995 batch: &[ArrayRef],
996 indices: &PrimitiveArray<Int64Type>,
997 repeat_mask: &[bool],
998) -> Result<Vec<Arc<dyn Array>>> {
999 batch
1000 .iter()
1001 .zip(repeat_mask.iter())
1002 .map(|(arr, &repeat)| {
1003 if repeat {
1004 Ok(kernels::take::take(arr, indices, None)?)
1005 } else {
1006 Ok(new_null_array(arr.data_type(), arr.len()))
1007 }
1008 })
1009 .collect()
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014 use super::*;
1015 use arrow::array::{
1016 GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
1017 };
1018 use arrow::buffer::{NullBuffer, OffsetBuffer};
1019 use arrow::datatypes::{Field, Int32Type};
1020 use datafusion_common::test_util::batches_to_string;
1021 use insta::assert_snapshot;
1022
1023 fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
1026 where
1027 OffsetSize: OffsetSizeTrait,
1028 {
1029 let mut values = vec![];
1030 let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
1031 let mut valid = NullBufferBuilder::new(6);
1032
1033 values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
1035 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1036 valid.append_non_null();
1037
1038 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1040 valid.append_non_null();
1041
1042 values.push(Some("?"));
1045 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1046 valid.append_null();
1047
1048 values.push(Some("D"));
1050 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1051 valid.append_non_null();
1052
1053 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1055 valid.append_null();
1056
1057 values.extend_from_slice(&[None, Some("F")]);
1059 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1060 valid.append_non_null();
1061
1062 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1063 GenericListArray::<OffsetSize>::new(
1064 field,
1065 OffsetBuffer::new(offsets.into()),
1066 Arc::new(StringArray::from(values)),
1067 valid.finish(),
1068 )
1069 }
1070
1071 fn make_fixed_list() -> FixedSizeListArray {
1074 let values = Arc::new(StringArray::from_iter([
1075 Some("A"),
1076 Some("B"),
1077 None,
1078 None,
1079 Some("C"),
1080 Some("D"),
1081 None,
1082 None,
1083 None,
1084 Some("F"),
1085 None,
1086 None,
1087 ]));
1088 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1089 let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1090 FixedSizeListArray::new(field, 2, values, Some(valid))
1091 }
1092
1093 fn verify_unnest_list_array(
1094 list_array: &dyn ListArrayType,
1095 lengths: Vec<i64>,
1096 expected: Vec<Option<&str>>,
1097 ) -> Result<()> {
1098 let length_array = Int64Array::from(lengths);
1099 let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1100 let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1101 assert_eq!(strs, expected);
1102 Ok(())
1103 }
1104
1105 #[test]
1106 fn test_build_batch_list_arr_recursive() -> Result<()> {
1107 let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1112 Some(vec![Some(1), Some(2), Some(3)]),
1113 None,
1114 Some(vec![Some(4), Some(5)]),
1115 Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1116 None,
1117 Some(vec![Some(11), Some(12), Some(13)]),
1118 ]);
1119
1120 let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1121 let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1122 let mut nulls = NullBufferBuilder::new(3);
1123 nulls.append_non_null();
1124 nulls.append_non_null();
1125 nulls.append_null();
1126 let col1_field = Field::new_list_field(
1128 DataType::List(Arc::new(Field::new_list_field(
1129 list_arr1_ref.data_type().to_owned(),
1130 true,
1131 ))),
1132 true,
1133 );
1134 let col1 = ListArray::new(
1135 Arc::new(Field::new_list_field(
1136 list_arr1_ref.data_type().to_owned(),
1137 true,
1138 )),
1139 offsets,
1140 list_arr1_ref,
1141 nulls.finish(),
1142 );
1143
1144 let list_arr2 = StringArray::from(vec![
1145 Some("a"),
1146 Some("b"),
1147 Some("c"),
1148 Some("d"),
1149 Some("e"),
1150 ]);
1151
1152 let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1153 let mut nulls = NullBufferBuilder::new(3);
1154 nulls.append_n_non_nulls(3);
1155 let col2_field = Field::new(
1156 "col2",
1157 DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1158 true,
1159 );
1160 let col2 = GenericListArray::<i32>::new(
1161 Arc::new(Field::new_list_field(DataType::Utf8, true)),
1162 OffsetBuffer::new(offsets.into()),
1163 Arc::new(list_arr2),
1164 nulls.finish(),
1165 );
1166 let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1168 let out_schema = Arc::new(Schema::new(vec![
1169 Field::new(
1170 "col1_unnest_placeholder_depth_1",
1171 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1172 true,
1173 ),
1174 Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1175 Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1176 ]));
1177 let batch = RecordBatch::try_new(
1178 Arc::clone(&schema),
1179 vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1180 )
1181 .unwrap();
1182 let list_type_columns = vec![
1183 ListUnnest {
1184 index_in_input_schema: 0,
1185 depth: 1,
1186 },
1187 ListUnnest {
1188 index_in_input_schema: 0,
1189 depth: 2,
1190 },
1191 ListUnnest {
1192 index_in_input_schema: 1,
1193 depth: 1,
1194 },
1195 ];
1196 let ret = build_batch(
1197 &batch,
1198 &out_schema,
1199 list_type_columns.as_ref(),
1200 &HashSet::default(),
1201 &UnnestOptions {
1202 preserve_nulls: true,
1203 recursions: vec![],
1204 },
1205 )?
1206 .unwrap();
1207
1208 assert_snapshot!(batches_to_string(&[ret]),
1209 @r###"
1210+---------------------------------+---------------------------------+---------------------------------+
1211| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |
1212+---------------------------------+---------------------------------+---------------------------------+
1213| [1, 2, 3] | 1 | a |
1214| | 2 | b |
1215| [4, 5] | 3 | |
1216| [1, 2, 3] | | a |
1217| | | b |
1218| [4, 5] | | |
1219| [1, 2, 3] | 4 | a |
1220| | 5 | b |
1221| [4, 5] | | |
1222| [7, 8, 9, 10] | 7 | c |
1223| | 8 | d |
1224| [11, 12, 13] | 9 | |
1225| | 10 | |
1226| [7, 8, 9, 10] | | c |
1227| | | d |
1228| [11, 12, 13] | | |
1229| [7, 8, 9, 10] | 11 | c |
1230| | 12 | d |
1231| [11, 12, 13] | 13 | |
1232| | | e |
1233+---------------------------------+---------------------------------+---------------------------------+
1234 "###);
1235 Ok(())
1236 }
1237
1238 #[test]
1239 fn test_unnest_list_array() -> Result<()> {
1240 let list_array = make_generic_array::<i32>();
1242 verify_unnest_list_array(
1243 &list_array,
1244 vec![3, 2, 1, 2, 0, 3],
1245 vec![
1246 Some("A"),
1247 Some("B"),
1248 Some("C"),
1249 None,
1250 None,
1251 None,
1252 Some("D"),
1253 None,
1254 None,
1255 Some("F"),
1256 None,
1257 ],
1258 )?;
1259
1260 let list_array = make_fixed_list();
1262 verify_unnest_list_array(
1263 &list_array,
1264 vec![3, 1, 2, 0, 2, 3],
1265 vec![
1266 Some("A"),
1267 Some("B"),
1268 None,
1269 None,
1270 Some("C"),
1271 Some("D"),
1272 None,
1273 Some("F"),
1274 None,
1275 None,
1276 None,
1277 ],
1278 )?;
1279
1280 Ok(())
1281 }
1282
1283 fn verify_longest_length(
1284 list_arrays: &[ArrayRef],
1285 preserve_nulls: bool,
1286 expected: Vec<i64>,
1287 ) -> Result<()> {
1288 let options = UnnestOptions {
1289 preserve_nulls,
1290 recursions: vec![],
1291 };
1292 let longest_length = find_longest_length(list_arrays, &options)?;
1293 let expected_array = Int64Array::from(expected);
1294 assert_eq!(
1295 longest_length
1296 .as_any()
1297 .downcast_ref::<Int64Array>()
1298 .unwrap(),
1299 &expected_array
1300 );
1301 Ok(())
1302 }
1303
1304 #[test]
1305 fn test_longest_list_length() -> Result<()> {
1306 let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1309 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1310 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1311
1312 let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1315 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1316 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1317
1318 let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1321 verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1322 verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1323
1324 let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1328 let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1329 let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1330 verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1331 verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1332
1333 Ok(())
1334 }
1335
1336 #[test]
1337 fn test_create_take_indices() -> Result<()> {
1338 let length_array = Int64Array::from(vec![2, 3, 1]);
1339 let take_indices = create_take_indices(&length_array, 6);
1340 let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1341 assert_eq!(take_indices, expected);
1342 Ok(())
1343 }
1344}