1use std::cmp::{self, Ordering};
21use std::task::{Poll, ready};
22use std::{any::Any, sync::Arc};
23
24use super::metrics::{
25 self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
26 RecordOutput,
27};
28use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
29use crate::{
30 DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
31 SendableRecordBatchStream, check_if_same_properties,
32};
33
34use arrow::array::{
35 Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray, Int64Array,
36 LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray, new_null_array,
37};
38use arrow::compute::kernels::length::length;
39use arrow::compute::kernels::zip::zip;
40use arrow::compute::{cast, is_not_null, kernels, sum};
41use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
42use arrow::record_batch::RecordBatch;
43use arrow_ord::cmp::lt;
44use async_trait::async_trait;
45use datafusion_common::{
46 Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err,
47 internal_err,
48};
49use datafusion_execution::TaskContext;
50use datafusion_physical_expr::PhysicalExpr;
51use datafusion_physical_expr::equivalence::ProjectionMapping;
52use datafusion_physical_expr::expressions::Column;
53use futures::{Stream, StreamExt};
54use log::trace;
55
56#[derive(Debug, Clone)]
63pub struct UnnestExec {
64 input: Arc<dyn ExecutionPlan>,
66 schema: SchemaRef,
68 list_column_indices: Vec<ListUnnest>,
70 struct_column_indices: Vec<usize>,
72 options: UnnestOptions,
74 metrics: ExecutionPlanMetricsSet,
76 cache: Arc<PlanProperties>,
78}
79
80impl UnnestExec {
81 pub fn new(
83 input: Arc<dyn ExecutionPlan>,
84 list_column_indices: Vec<ListUnnest>,
85 struct_column_indices: Vec<usize>,
86 schema: SchemaRef,
87 options: UnnestOptions,
88 ) -> Result<Self> {
89 let cache = Self::compute_properties(
90 &input,
91 &list_column_indices,
92 &struct_column_indices,
93 &schema,
94 )?;
95
96 Ok(UnnestExec {
97 input,
98 schema,
99 list_column_indices,
100 struct_column_indices,
101 options,
102 metrics: Default::default(),
103 cache: Arc::new(cache),
104 })
105 }
106
107 fn compute_properties(
109 input: &Arc<dyn ExecutionPlan>,
110 list_column_indices: &[ListUnnest],
111 struct_column_indices: &[usize],
112 schema: &SchemaRef,
113 ) -> Result<PlanProperties> {
114 let input_schema = input.schema();
116 let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len());
117 unnested_indices.append_n(input_schema.fields().len(), false);
118 for list_unnest in list_column_indices {
119 unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
120 }
121 for struct_unnest in struct_column_indices {
122 unnested_indices.set_bit(*struct_unnest, true)
123 }
124 let unnested_indices = unnested_indices.finish();
125 let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
126 .filter(|idx| !unnested_indices.value(*idx))
127 .collect();
128
129 let input_schema = input.schema();
131 let projection_mapping: ProjectionMapping = non_unnested_indices
132 .iter()
133 .map(|&input_idx| {
134 let input_field = input_schema.field(input_idx);
136 let output_idx = schema
137 .fields()
138 .iter()
139 .position(|output_field| output_field.name() == input_field.name())
140 .ok_or_else(|| {
141 exec_datafusion_err!(
142 "Non-unnested column '{}' must exist in output schema",
143 input_field.name()
144 )
145 })?;
146
147 let input_col = Arc::new(Column::new(input_field.name(), input_idx))
148 as Arc<dyn PhysicalExpr>;
149 let target_col = Arc::new(Column::new(input_field.name(), output_idx))
150 as Arc<dyn PhysicalExpr>;
151 let targets = vec![(target_col, output_idx)].into();
153 Ok((input_col, targets))
154 })
155 .collect::<Result<ProjectionMapping>>()?;
156
157 let input_eq_properties = input.equivalence_properties();
161 let eq_properties = input_eq_properties
162 .project(&projection_mapping, Arc::clone(schema))
163 .with_constraints(Constraints::default());
164
165 let output_partitioning = input
167 .output_partitioning()
168 .project(&projection_mapping, &eq_properties);
169
170 Ok(PlanProperties::new(
171 eq_properties,
172 output_partitioning,
173 input.pipeline_behavior(),
174 input.boundedness(),
175 ))
176 }
177
178 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
180 &self.input
181 }
182
183 pub fn list_column_indices(&self) -> &[ListUnnest] {
185 &self.list_column_indices
186 }
187
188 pub fn struct_column_indices(&self) -> &[usize] {
190 &self.struct_column_indices
191 }
192
193 pub fn options(&self) -> &UnnestOptions {
194 &self.options
195 }
196
197 fn with_new_children_and_same_properties(
198 &self,
199 mut children: Vec<Arc<dyn ExecutionPlan>>,
200 ) -> Self {
201 Self {
202 input: children.swap_remove(0),
203 metrics: ExecutionPlanMetricsSet::new(),
204 ..Self::clone(self)
205 }
206 }
207}
208
209impl DisplayAs for UnnestExec {
210 fn fmt_as(
211 &self,
212 t: DisplayFormatType,
213 f: &mut std::fmt::Formatter,
214 ) -> std::fmt::Result {
215 match t {
216 DisplayFormatType::Default | DisplayFormatType::Verbose => {
217 write!(f, "UnnestExec")
218 }
219 DisplayFormatType::TreeRender => {
220 write!(f, "")
221 }
222 }
223 }
224}
225
226impl ExecutionPlan for UnnestExec {
227 fn name(&self) -> &'static str {
228 "UnnestExec"
229 }
230
231 fn as_any(&self) -> &dyn Any {
232 self
233 }
234
235 fn properties(&self) -> &Arc<PlanProperties> {
236 &self.cache
237 }
238
239 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
240 vec![&self.input]
241 }
242
243 fn with_new_children(
244 self: Arc<Self>,
245 mut children: Vec<Arc<dyn ExecutionPlan>>,
246 ) -> Result<Arc<dyn ExecutionPlan>> {
247 check_if_same_properties!(self, children);
248 Ok(Arc::new(UnnestExec::new(
249 children.swap_remove(0),
250 self.list_column_indices.clone(),
251 self.struct_column_indices.clone(),
252 Arc::clone(&self.schema),
253 self.options.clone(),
254 )?))
255 }
256
257 fn required_input_distribution(&self) -> Vec<Distribution> {
258 vec![Distribution::UnspecifiedDistribution]
259 }
260
261 fn execute(
262 &self,
263 partition: usize,
264 context: Arc<TaskContext>,
265 ) -> Result<SendableRecordBatchStream> {
266 let input = self.input.execute(partition, context)?;
267 let metrics = UnnestMetrics::new(partition, &self.metrics);
268
269 Ok(Box::pin(UnnestStream {
270 input,
271 schema: Arc::clone(&self.schema),
272 list_type_columns: self.list_column_indices.clone(),
273 struct_column_indices: self.struct_column_indices.iter().copied().collect(),
274 options: self.options.clone(),
275 metrics,
276 }))
277 }
278
279 fn metrics(&self) -> Option<MetricsSet> {
280 Some(self.metrics.clone_inner())
281 }
282}
283
284#[derive(Clone, Debug)]
285struct UnnestMetrics {
286 baseline_metrics: BaselineMetrics,
288 input_batches: metrics::Count,
290 input_rows: metrics::Count,
292}
293
294impl UnnestMetrics {
295 fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
296 let input_batches =
297 MetricBuilder::new(metrics).counter("input_batches", partition);
298
299 let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
300
301 Self {
302 baseline_metrics: BaselineMetrics::new(metrics, partition),
303 input_batches,
304 input_rows,
305 }
306 }
307}
308
309struct UnnestStream {
311 input: SendableRecordBatchStream,
313 schema: Arc<Schema>,
315 list_type_columns: Vec<ListUnnest>,
319 struct_column_indices: HashSet<usize>,
320 options: UnnestOptions,
322 metrics: UnnestMetrics,
324}
325
326impl RecordBatchStream for UnnestStream {
327 fn schema(&self) -> SchemaRef {
328 Arc::clone(&self.schema)
329 }
330}
331
332#[async_trait]
333impl Stream for UnnestStream {
334 type Item = Result<RecordBatch>;
335
336 fn poll_next(
337 mut self: std::pin::Pin<&mut Self>,
338 cx: &mut std::task::Context<'_>,
339 ) -> Poll<Option<Self::Item>> {
340 self.poll_next_impl(cx)
341 }
342}
343
344impl UnnestStream {
345 fn poll_next_impl(
348 &mut self,
349 cx: &mut std::task::Context<'_>,
350 ) -> Poll<Option<Result<RecordBatch>>> {
351 loop {
352 return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
353 Some(Ok(batch)) => {
354 let elapsed_compute =
355 self.metrics.baseline_metrics.elapsed_compute().clone();
356 let timer = elapsed_compute.timer();
357 self.metrics.input_batches.add(1);
358 self.metrics.input_rows.add(batch.num_rows());
359 let result = build_batch(
360 &batch,
361 &self.schema,
362 &self.list_type_columns,
363 &self.struct_column_indices,
364 &self.options,
365 )?;
366 timer.done();
367 let Some(result_batch) = result else {
368 continue;
369 };
370 (&result_batch).record_output(&self.metrics.baseline_metrics);
371
372 debug_assert!(result_batch.num_rows() > 0);
375 Some(Ok(result_batch))
376 }
377 other => {
378 trace!(
379 "Processed {} probe-side input batches containing {} rows and \
380 produced {} output batches containing {} rows in {}",
381 self.metrics.input_batches,
382 self.metrics.input_rows,
383 self.metrics.baseline_metrics.output_batches(),
384 self.metrics.baseline_metrics.output_rows(),
385 self.metrics.baseline_metrics.elapsed_compute(),
386 );
387 other
388 }
389 });
390 }
391 }
392}
393
394fn flatten_struct_cols(
405 input_batch: &[Arc<dyn Array>],
406 schema: &SchemaRef,
407 struct_column_indices: &HashSet<usize>,
408) -> Result<RecordBatch> {
409 let columns_expanded = input_batch
411 .iter()
412 .enumerate()
413 .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
414 Some(_) => match column_data.data_type() {
415 DataType::Struct(_) => {
416 let struct_arr =
417 column_data.as_any().downcast_ref::<StructArray>().unwrap();
418 Ok(struct_arr.columns().to_vec())
419 }
420 data_type => internal_err!(
421 "expecting column {} from input plan to be a struct, got {:?}",
422 idx,
423 data_type
424 ),
425 },
426 None => Ok(vec![Arc::clone(column_data)]),
427 })
428 .collect::<Result<Vec<_>>>()?
429 .into_iter()
430 .flatten()
431 .collect();
432 Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
433}
434
435#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
436pub struct ListUnnest {
437 pub index_in_input_schema: usize,
438 pub depth: usize,
439}
440
441fn list_unnest_at_level(
480 batch: &[ArrayRef],
481 list_type_unnests: &[ListUnnest],
482 temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
483 level_to_unnest: usize,
484 options: &UnnestOptions,
485) -> Result<Option<Vec<ArrayRef>>> {
486 let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
488 list_type_unnests
489 .iter()
490 .filter_map(|unnesting| {
491 if level_to_unnest == unnesting.depth {
492 return Some((
493 Arc::clone(&batch[unnesting.index_in_input_schema]),
494 *unnesting,
495 ));
496 }
497 if level_to_unnest < unnesting.depth {
500 return Some((
501 Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
502 *unnesting,
503 ));
504 }
505 None
506 })
507 .unzip();
508
509 let longest_length = find_longest_length(&arrs_to_unnest, options)?;
512 let unnested_length = longest_length.as_primitive::<Int64Type>();
513 let total_length = if unnested_length.is_empty() {
514 0
515 } else {
516 sum(unnested_length).ok_or_else(|| {
517 exec_datafusion_err!("Failed to calculate the total unnested length")
518 })? as usize
519 };
520 if total_length == 0 {
521 return Ok(None);
522 }
523
524 let unnested_temp_arrays =
526 unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
527
528 let take_indices = create_take_indices(unnested_length, total_length);
530 unnested_temp_arrays
531 .into_iter()
532 .zip(list_unnest_specs.iter())
533 .for_each(|(flatten_arr, unnesting)| {
534 temp_unnested_arrs.insert(*unnesting, flatten_arr);
535 });
536
537 let repeat_mask: Vec<bool> = batch
538 .iter()
539 .enumerate()
540 .map(|(i, _)| {
541 let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
543 unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
544 });
545
546 let is_involved_in_unnesting = list_type_unnests
548 .iter()
549 .any(|unnesting| unnesting.index_in_input_schema == i);
550
551 needed_in_future_levels || !is_involved_in_unnesting
553 })
554 .collect();
555
556 let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
559
560 Ok(Some(ret))
561}
562struct UnnestingResult {
563 arr: ArrayRef,
564 depth: usize,
565}
566
567fn build_batch(
624 batch: &RecordBatch,
625 schema: &SchemaRef,
626 list_type_columns: &[ListUnnest],
627 struct_column_indices: &HashSet<usize>,
628 options: &UnnestOptions,
629) -> Result<Option<RecordBatch>> {
630 let transformed = match list_type_columns.len() {
631 0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
632 _ => {
633 let mut temp_unnested_result = HashMap::new();
634 let max_recursion = list_type_columns
635 .iter()
636 .fold(0, |highest_depth, ListUnnest { depth, .. }| {
637 cmp::max(highest_depth, *depth)
638 });
639
640 let mut flatten_arrs = vec![];
642
643 for depth in (1..=max_recursion).rev() {
646 let input = match depth == max_recursion {
647 true => batch.columns(),
648 false => &flatten_arrs,
649 };
650 let Some(temp_result) = list_unnest_at_level(
651 input,
652 list_type_columns,
653 &mut temp_unnested_result,
654 depth,
655 options,
656 )?
657 else {
658 return Ok(None);
659 };
660 flatten_arrs = temp_result;
661 }
662 let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
663 temp_unnested_result.into_iter().fold(
664 HashMap::new(),
665 |mut acc,
666 (
667 ListUnnest {
668 index_in_input_schema,
669 depth,
670 },
671 flattened_array,
672 )| {
673 acc.entry(index_in_input_schema).or_default().push(
674 UnnestingResult {
675 arr: flattened_array,
676 depth,
677 },
678 );
679 acc
680 },
681 );
682 let output_order: HashMap<ListUnnest, usize> = list_type_columns
683 .iter()
684 .enumerate()
685 .map(|(order, unnest_def)| (*unnest_def, order))
686 .collect();
687
688 let mut multi_unnested_per_original_index = unnested_array_map
690 .into_iter()
691 .map(
692 |(original_index, mut unnested_columns)| {
696 unnested_columns.sort_by(
697 |UnnestingResult { depth: depth1, .. },
698 UnnestingResult { depth: depth2, .. }|
699 -> Ordering {
700 output_order
701 .get(&ListUnnest {
702 depth: *depth1,
703 index_in_input_schema: original_index,
704 })
705 .unwrap()
706 .cmp(
707 output_order
708 .get(&ListUnnest {
709 depth: *depth2,
710 index_in_input_schema: original_index,
711 })
712 .unwrap(),
713 )
714 },
715 );
716 (
717 original_index,
718 unnested_columns
719 .into_iter()
720 .map(|result| result.arr)
721 .collect::<Vec<_>>(),
722 )
723 },
724 )
725 .collect::<HashMap<_, _>>();
726
727 let ret = flatten_arrs
728 .into_iter()
729 .enumerate()
730 .flat_map(|(col_idx, arr)| {
731 match multi_unnested_per_original_index.remove(&col_idx) {
735 Some(unnested_arrays) => unnested_arrays,
736 None => vec![arr],
737 }
738 })
739 .collect::<Vec<_>>();
740
741 flatten_struct_cols(&ret, schema, struct_column_indices)
742 }
743 }?;
744 Ok(Some(transformed))
745}
746
747fn find_longest_length(
769 list_arrays: &[ArrayRef],
770 options: &UnnestOptions,
771) -> Result<ArrayRef> {
772 let null_length = if options.preserve_nulls {
774 Scalar::new(Int64Array::from_value(1, 1))
775 } else {
776 Scalar::new(Int64Array::from_value(0, 1))
777 };
778 let list_lengths: Vec<ArrayRef> = list_arrays
779 .iter()
780 .map(|list_array| {
781 let mut length_array = length(list_array)?;
782 length_array = cast(&length_array, &DataType::Int64)?;
784 length_array =
785 zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
786 Ok(length_array)
787 })
788 .collect::<Result<_>>()?;
789
790 let longest_length = list_lengths.iter().skip(1).try_fold(
791 Arc::clone(&list_lengths[0]),
792 |longest, current| {
793 let is_lt = lt(&longest, ¤t)?;
794 zip(&is_lt, ¤t, &longest)
795 },
796 )?;
797 Ok(longest_length)
798}
799
800trait ListArrayType: Array {
802 fn values(&self) -> &ArrayRef;
804
805 fn value_offsets(&self, row: usize) -> (i64, i64);
807}
808
809impl ListArrayType for ListArray {
810 fn values(&self) -> &ArrayRef {
811 self.values()
812 }
813
814 fn value_offsets(&self, row: usize) -> (i64, i64) {
815 let offsets = self.value_offsets();
816 (offsets[row].into(), offsets[row + 1].into())
817 }
818}
819
820impl ListArrayType for LargeListArray {
821 fn values(&self) -> &ArrayRef {
822 self.values()
823 }
824
825 fn value_offsets(&self, row: usize) -> (i64, i64) {
826 let offsets = self.value_offsets();
827 (offsets[row], offsets[row + 1])
828 }
829}
830
831impl ListArrayType for FixedSizeListArray {
832 fn values(&self) -> &ArrayRef {
833 self.values()
834 }
835
836 fn value_offsets(&self, row: usize) -> (i64, i64) {
837 let start = self.value_offset(row) as i64;
838 (start, start + self.value_length() as i64)
839 }
840}
841
842fn unnest_list_arrays(
844 list_arrays: &[ArrayRef],
845 length_array: &PrimitiveArray<Int64Type>,
846 capacity: usize,
847) -> Result<Vec<ArrayRef>> {
848 let typed_arrays = list_arrays
849 .iter()
850 .map(|list_array| match list_array.data_type() {
851 DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
852 DataType::LargeList(_) => {
853 Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
854 }
855 DataType::FixedSizeList(_, _) => {
856 Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
857 }
858 other => exec_err!("Invalid unnest datatype {other }"),
859 })
860 .collect::<Result<Vec<_>>>()?;
861
862 typed_arrays
863 .iter()
864 .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
865 .collect::<Result<_>>()
866}
867
868fn unnest_list_array(
889 list_array: &dyn ListArrayType,
890 length_array: &PrimitiveArray<Int64Type>,
891 capacity: usize,
892) -> Result<ArrayRef> {
893 let values = list_array.values();
894 let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
895 for row in 0..list_array.len() {
896 let mut value_length = 0;
897 if !list_array.is_null(row) {
898 let (start, end) = list_array.value_offsets(row);
899 value_length = end - start;
900 for i in start..end {
901 take_indices_builder.append_value(i)
902 }
903 }
904 let target_length = length_array.value(row);
905 debug_assert!(
906 value_length <= target_length,
907 "value length is beyond the longest length"
908 );
909 for _ in value_length..target_length {
911 take_indices_builder.append_null();
912 }
913 }
914 Ok(kernels::take::take(
915 &values,
916 &take_indices_builder.finish(),
917 None,
918 )?)
919}
920
921fn create_take_indices(
936 length_array: &PrimitiveArray<Int64Type>,
937 capacity: usize,
938) -> PrimitiveArray<Int64Type> {
939 debug_assert!(
941 length_array.null_count() == 0,
942 "length array should not contain nulls"
943 );
944 let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
945 for (index, repeat) in length_array.iter().enumerate() {
946 let repeat = repeat.unwrap();
948 (0..repeat).for_each(|_| builder.append_value(index as i64));
949 }
950 builder.finish()
951}
952
953fn repeat_arrs_from_indices(
1000 batch: &[ArrayRef],
1001 indices: &PrimitiveArray<Int64Type>,
1002 repeat_mask: &[bool],
1003) -> Result<Vec<Arc<dyn Array>>> {
1004 batch
1005 .iter()
1006 .zip(repeat_mask.iter())
1007 .map(|(arr, &repeat)| {
1008 if repeat {
1009 Ok(kernels::take::take(arr, indices, None)?)
1010 } else {
1011 Ok(new_null_array(arr.data_type(), arr.len()))
1012 }
1013 })
1014 .collect()
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019 use super::*;
1020 use arrow::array::{
1021 GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
1022 };
1023 use arrow::buffer::{NullBuffer, OffsetBuffer};
1024 use arrow::datatypes::{Field, Int32Type};
1025 use datafusion_common::test_util::batches_to_string;
1026 use insta::assert_snapshot;
1027
1028 fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
1031 where
1032 OffsetSize: OffsetSizeTrait,
1033 {
1034 let mut values = vec![];
1035 let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
1036 let mut valid = NullBufferBuilder::new(6);
1037
1038 values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
1040 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1041 valid.append_non_null();
1042
1043 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1045 valid.append_non_null();
1046
1047 values.push(Some("?"));
1050 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1051 valid.append_null();
1052
1053 values.push(Some("D"));
1055 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1056 valid.append_non_null();
1057
1058 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1060 valid.append_null();
1061
1062 values.extend_from_slice(&[None, Some("F")]);
1064 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1065 valid.append_non_null();
1066
1067 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1068 GenericListArray::<OffsetSize>::new(
1069 field,
1070 OffsetBuffer::new(offsets.into()),
1071 Arc::new(StringArray::from(values)),
1072 valid.finish(),
1073 )
1074 }
1075
1076 fn make_fixed_list() -> FixedSizeListArray {
1079 let values = Arc::new(StringArray::from_iter([
1080 Some("A"),
1081 Some("B"),
1082 None,
1083 None,
1084 Some("C"),
1085 Some("D"),
1086 None,
1087 None,
1088 None,
1089 Some("F"),
1090 None,
1091 None,
1092 ]));
1093 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1094 let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1095 FixedSizeListArray::new(field, 2, values, Some(valid))
1096 }
1097
1098 fn verify_unnest_list_array(
1099 list_array: &dyn ListArrayType,
1100 lengths: Vec<i64>,
1101 expected: Vec<Option<&str>>,
1102 ) -> Result<()> {
1103 let length_array = Int64Array::from(lengths);
1104 let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1105 let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1106 assert_eq!(strs, expected);
1107 Ok(())
1108 }
1109
1110 #[test]
1111 fn test_build_batch_list_arr_recursive() -> Result<()> {
1112 let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1117 Some(vec![Some(1), Some(2), Some(3)]),
1118 None,
1119 Some(vec![Some(4), Some(5)]),
1120 Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1121 None,
1122 Some(vec![Some(11), Some(12), Some(13)]),
1123 ]);
1124
1125 let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1126 let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1127 let mut nulls = NullBufferBuilder::new(3);
1128 nulls.append_non_null();
1129 nulls.append_non_null();
1130 nulls.append_null();
1131 let col1_field = Field::new_list_field(
1133 DataType::List(Arc::new(Field::new_list_field(
1134 list_arr1_ref.data_type().to_owned(),
1135 true,
1136 ))),
1137 true,
1138 );
1139 let col1 = ListArray::new(
1140 Arc::new(Field::new_list_field(
1141 list_arr1_ref.data_type().to_owned(),
1142 true,
1143 )),
1144 offsets,
1145 list_arr1_ref,
1146 nulls.finish(),
1147 );
1148
1149 let list_arr2 = StringArray::from(vec![
1150 Some("a"),
1151 Some("b"),
1152 Some("c"),
1153 Some("d"),
1154 Some("e"),
1155 ]);
1156
1157 let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1158 let mut nulls = NullBufferBuilder::new(3);
1159 nulls.append_n_non_nulls(3);
1160 let col2_field = Field::new(
1161 "col2",
1162 DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1163 true,
1164 );
1165 let col2 = GenericListArray::<i32>::new(
1166 Arc::new(Field::new_list_field(DataType::Utf8, true)),
1167 OffsetBuffer::new(offsets.into()),
1168 Arc::new(list_arr2),
1169 nulls.finish(),
1170 );
1171 let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1173 let out_schema = Arc::new(Schema::new(vec![
1174 Field::new(
1175 "col1_unnest_placeholder_depth_1",
1176 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1177 true,
1178 ),
1179 Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1180 Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1181 ]));
1182 let batch = RecordBatch::try_new(
1183 Arc::clone(&schema),
1184 vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1185 )
1186 .unwrap();
1187 let list_type_columns = vec![
1188 ListUnnest {
1189 index_in_input_schema: 0,
1190 depth: 1,
1191 },
1192 ListUnnest {
1193 index_in_input_schema: 0,
1194 depth: 2,
1195 },
1196 ListUnnest {
1197 index_in_input_schema: 1,
1198 depth: 1,
1199 },
1200 ];
1201 let ret = build_batch(
1202 &batch,
1203 &out_schema,
1204 list_type_columns.as_ref(),
1205 &HashSet::default(),
1206 &UnnestOptions {
1207 preserve_nulls: true,
1208 recursions: vec![],
1209 },
1210 )?
1211 .unwrap();
1212
1213 assert_snapshot!(batches_to_string(&[ret]),
1214 @r"
1215 +---------------------------------+---------------------------------+---------------------------------+
1216 | col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |
1217 +---------------------------------+---------------------------------+---------------------------------+
1218 | [1, 2, 3] | 1 | a |
1219 | | 2 | b |
1220 | [4, 5] | 3 | |
1221 | [1, 2, 3] | | a |
1222 | | | b |
1223 | [4, 5] | | |
1224 | [1, 2, 3] | 4 | a |
1225 | | 5 | b |
1226 | [4, 5] | | |
1227 | [7, 8, 9, 10] | 7 | c |
1228 | | 8 | d |
1229 | [11, 12, 13] | 9 | |
1230 | | 10 | |
1231 | [7, 8, 9, 10] | | c |
1232 | | | d |
1233 | [11, 12, 13] | | |
1234 | [7, 8, 9, 10] | 11 | c |
1235 | | 12 | d |
1236 | [11, 12, 13] | 13 | |
1237 | | | e |
1238 +---------------------------------+---------------------------------+---------------------------------+
1239 ");
1240 Ok(())
1241 }
1242
1243 #[test]
1244 fn test_unnest_list_array() -> Result<()> {
1245 let list_array = make_generic_array::<i32>();
1247 verify_unnest_list_array(
1248 &list_array,
1249 vec![3, 2, 1, 2, 0, 3],
1250 vec![
1251 Some("A"),
1252 Some("B"),
1253 Some("C"),
1254 None,
1255 None,
1256 None,
1257 Some("D"),
1258 None,
1259 None,
1260 Some("F"),
1261 None,
1262 ],
1263 )?;
1264
1265 let list_array = make_fixed_list();
1267 verify_unnest_list_array(
1268 &list_array,
1269 vec![3, 1, 2, 0, 2, 3],
1270 vec![
1271 Some("A"),
1272 Some("B"),
1273 None,
1274 None,
1275 Some("C"),
1276 Some("D"),
1277 None,
1278 Some("F"),
1279 None,
1280 None,
1281 None,
1282 ],
1283 )?;
1284
1285 Ok(())
1286 }
1287
1288 fn verify_longest_length(
1289 list_arrays: &[ArrayRef],
1290 preserve_nulls: bool,
1291 expected: Vec<i64>,
1292 ) -> Result<()> {
1293 let options = UnnestOptions {
1294 preserve_nulls,
1295 recursions: vec![],
1296 };
1297 let longest_length = find_longest_length(list_arrays, &options)?;
1298 let expected_array = Int64Array::from(expected);
1299 assert_eq!(
1300 longest_length
1301 .as_any()
1302 .downcast_ref::<Int64Array>()
1303 .unwrap(),
1304 &expected_array
1305 );
1306 Ok(())
1307 }
1308
1309 #[test]
1310 fn test_longest_list_length() -> Result<()> {
1311 let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1314 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1315 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1316
1317 let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1320 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1321 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1322
1323 let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1326 verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1327 verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1328
1329 let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1333 let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1334 let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1335 verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1336 verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1337
1338 Ok(())
1339 }
1340
1341 #[test]
1342 fn test_create_take_indices() -> Result<()> {
1343 let length_array = Int64Array::from(vec![2, 3, 1]);
1344 let take_indices = create_take_indices(&length_array, 6);
1345 let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1346 assert_eq!(take_indices, expected);
1347 Ok(())
1348 }
1349}