1use std::cmp::{self, Ordering};
21use std::sync::Arc;
22use std::task::{Poll, ready};
23
24use super::metrics::{
25 self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory,
26 MetricsSet, RecordOutput,
27};
28use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
29use crate::stream::EmptyRecordBatchStream;
30use crate::{
31 DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
32 SendableRecordBatchStream, check_if_same_properties,
33};
34
35use arrow::array::{
36 Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray, Int64Array,
37 LargeListArray, LargeListViewArray, ListArray, ListViewArray, PrimitiveArray, Scalar,
38 StructArray, new_null_array,
39};
40use arrow::compute::kernels::length::length;
41use arrow::compute::kernels::zip::zip;
42use arrow::compute::{cast, is_not_null, kernels, sum};
43use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
44use arrow::record_batch::RecordBatch;
45use arrow_ord::cmp::lt;
46use async_trait::async_trait;
47use datafusion_common::{
48 Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err,
49 internal_err,
50};
51use datafusion_execution::TaskContext;
52use datafusion_physical_expr::PhysicalExpr;
53use datafusion_physical_expr::equivalence::ProjectionMapping;
54use datafusion_physical_expr::expressions::Column;
55use futures::{Stream, StreamExt};
56use log::trace;
57
58#[derive(Debug, Clone)]
65pub struct UnnestExec {
66 input: Arc<dyn ExecutionPlan>,
68 schema: SchemaRef,
70 list_column_indices: Vec<ListUnnest>,
72 struct_column_indices: Vec<usize>,
74 options: UnnestOptions,
76 metrics: ExecutionPlanMetricsSet,
78 cache: Arc<PlanProperties>,
80}
81
82impl UnnestExec {
83 pub fn new(
85 input: Arc<dyn ExecutionPlan>,
86 list_column_indices: Vec<ListUnnest>,
87 struct_column_indices: Vec<usize>,
88 schema: SchemaRef,
89 options: UnnestOptions,
90 ) -> Result<Self> {
91 let cache = Self::compute_properties(
92 &input,
93 &list_column_indices,
94 &struct_column_indices,
95 &schema,
96 )?;
97
98 Ok(UnnestExec {
99 input,
100 schema,
101 list_column_indices,
102 struct_column_indices,
103 options,
104 metrics: Default::default(),
105 cache: Arc::new(cache),
106 })
107 }
108
109 fn compute_properties(
111 input: &Arc<dyn ExecutionPlan>,
112 list_column_indices: &[ListUnnest],
113 struct_column_indices: &[usize],
114 schema: &SchemaRef,
115 ) -> Result<PlanProperties> {
116 let input_schema = input.schema();
118 let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len());
119 unnested_indices.append_n(input_schema.fields().len(), false);
120 for list_unnest in list_column_indices {
121 unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
122 }
123 for struct_unnest in struct_column_indices {
124 unnested_indices.set_bit(*struct_unnest, true)
125 }
126 let unnested_indices = unnested_indices.finish();
127 let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
128 .filter(|idx| !unnested_indices.value(*idx))
129 .collect();
130
131 let input_schema = input.schema();
133 let projection_mapping: ProjectionMapping = non_unnested_indices
134 .iter()
135 .map(|&input_idx| {
136 let input_field = input_schema.field(input_idx);
138 let output_idx = schema
139 .fields()
140 .iter()
141 .position(|output_field| output_field.name() == input_field.name())
142 .ok_or_else(|| {
143 exec_datafusion_err!(
144 "Non-unnested column '{}' must exist in output schema",
145 input_field.name()
146 )
147 })?;
148
149 let input_col = Arc::new(Column::new(input_field.name(), input_idx))
150 as Arc<dyn PhysicalExpr>;
151 let target_col = Arc::new(Column::new(input_field.name(), output_idx))
152 as Arc<dyn PhysicalExpr>;
153 let targets = vec![(target_col, output_idx)].into();
155 Ok((input_col, targets))
156 })
157 .collect::<Result<ProjectionMapping>>()?;
158
159 let input_eq_properties = input.equivalence_properties();
163 let eq_properties = input_eq_properties
164 .project(&projection_mapping, Arc::clone(schema))
165 .with_constraints(Constraints::default());
166
167 let output_partitioning = input
169 .output_partitioning()
170 .project(&projection_mapping, &eq_properties);
171
172 Ok(PlanProperties::new(
173 eq_properties,
174 output_partitioning,
175 input.pipeline_behavior(),
176 input.boundedness(),
177 ))
178 }
179
180 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
182 &self.input
183 }
184
185 pub fn list_column_indices(&self) -> &[ListUnnest] {
187 &self.list_column_indices
188 }
189
190 pub fn struct_column_indices(&self) -> &[usize] {
192 &self.struct_column_indices
193 }
194
195 pub fn options(&self) -> &UnnestOptions {
196 &self.options
197 }
198
199 fn with_new_children_and_same_properties(
200 &self,
201 mut children: Vec<Arc<dyn ExecutionPlan>>,
202 ) -> Self {
203 Self {
204 input: children.swap_remove(0),
205 metrics: ExecutionPlanMetricsSet::new(),
206 ..Self::clone(self)
207 }
208 }
209}
210
211impl DisplayAs for UnnestExec {
212 fn fmt_as(
213 &self,
214 t: DisplayFormatType,
215 f: &mut std::fmt::Formatter,
216 ) -> std::fmt::Result {
217 match t {
218 DisplayFormatType::Default | DisplayFormatType::Verbose => {
219 write!(f, "UnnestExec")
220 }
221 DisplayFormatType::TreeRender => {
222 write!(f, "")
223 }
224 }
225 }
226}
227
228impl ExecutionPlan for UnnestExec {
229 fn name(&self) -> &'static str {
230 "UnnestExec"
231 }
232
233 fn properties(&self) -> &Arc<PlanProperties> {
234 &self.cache
235 }
236
237 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
238 vec![&self.input]
239 }
240
241 fn with_new_children(
242 self: Arc<Self>,
243 mut children: Vec<Arc<dyn ExecutionPlan>>,
244 ) -> Result<Arc<dyn ExecutionPlan>> {
245 check_if_same_properties!(self, children);
246 Ok(Arc::new(UnnestExec::new(
247 children.swap_remove(0),
248 self.list_column_indices.clone(),
249 self.struct_column_indices.clone(),
250 Arc::clone(&self.schema),
251 self.options.clone(),
252 )?))
253 }
254
255 fn required_input_distribution(&self) -> Vec<Distribution> {
256 vec![Distribution::UnspecifiedDistribution]
257 }
258
259 fn execute(
260 &self,
261 partition: usize,
262 context: Arc<TaskContext>,
263 ) -> Result<SendableRecordBatchStream> {
264 let input = self.input.execute(partition, context)?;
265 let metrics = UnnestMetrics::new(partition, &self.metrics);
266
267 Ok(Box::pin(UnnestStream {
268 input,
269 schema: Arc::clone(&self.schema),
270 list_type_columns: self.list_column_indices.clone(),
271 struct_column_indices: self.struct_column_indices.iter().copied().collect(),
272 options: self.options.clone(),
273 metrics,
274 }))
275 }
276
277 fn metrics(&self) -> Option<MetricsSet> {
278 Some(self.metrics.clone_inner())
279 }
280}
281
282#[derive(Clone, Debug)]
283struct UnnestMetrics {
284 baseline_metrics: BaselineMetrics,
286 input_batches: metrics::Count,
288 input_rows: metrics::Count,
290}
291
292impl UnnestMetrics {
293 fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
294 let input_batches = MetricBuilder::new(metrics)
295 .with_category(MetricCategory::Rows)
296 .counter("input_batches", partition);
297
298 let input_rows = MetricBuilder::new(metrics)
299 .with_category(MetricCategory::Rows)
300 .counter("input_rows", partition);
301
302 Self {
303 baseline_metrics: BaselineMetrics::new(metrics, partition),
304 input_batches,
305 input_rows,
306 }
307 }
308}
309
310struct UnnestStream {
312 input: SendableRecordBatchStream,
314 schema: Arc<Schema>,
316 list_type_columns: Vec<ListUnnest>,
320 struct_column_indices: HashSet<usize>,
321 options: UnnestOptions,
323 metrics: UnnestMetrics,
325}
326
327impl RecordBatchStream for UnnestStream {
328 fn schema(&self) -> SchemaRef {
329 Arc::clone(&self.schema)
330 }
331}
332
333#[async_trait]
334impl Stream for UnnestStream {
335 type Item = Result<RecordBatch>;
336
337 fn poll_next(
338 mut self: std::pin::Pin<&mut Self>,
339 cx: &mut std::task::Context<'_>,
340 ) -> Poll<Option<Self::Item>> {
341 self.poll_next_impl(cx)
342 }
343}
344
345impl UnnestStream {
346 fn poll_next_impl(
349 &mut self,
350 cx: &mut std::task::Context<'_>,
351 ) -> Poll<Option<Result<RecordBatch>>> {
352 loop {
353 return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
354 Some(Ok(batch)) => {
355 let elapsed_compute =
356 self.metrics.baseline_metrics.elapsed_compute().clone();
357 let timer = elapsed_compute.timer();
358 self.metrics.input_batches.add(1);
359 self.metrics.input_rows.add(batch.num_rows());
360 let result = build_batch(
361 &batch,
362 &self.schema,
363 &self.list_type_columns,
364 &self.struct_column_indices,
365 &self.options,
366 )?;
367 timer.done();
368 let Some(result_batch) = result else {
369 continue;
370 };
371 (&result_batch).record_output(&self.metrics.baseline_metrics);
372
373 debug_assert!(result_batch.num_rows() > 0);
376 Some(Ok(result_batch))
377 }
378 other => {
380 trace!(
381 "Processed {} probe-side input batches containing {} rows and \
382 produced {} output batches containing {} rows in {}",
383 self.metrics.input_batches,
384 self.metrics.input_rows,
385 self.metrics.baseline_metrics.output_batches(),
386 self.metrics.baseline_metrics.output_rows(),
387 self.metrics.baseline_metrics.elapsed_compute(),
388 );
389
390 if other.is_none() {
392 let input_schema = self.input.schema();
394 self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
395 }
396
397 other
398 }
399 });
400 }
401 }
402}
403
404fn flatten_struct_cols(
415 input_batch: &[Arc<dyn Array>],
416 schema: &SchemaRef,
417 struct_column_indices: &HashSet<usize>,
418) -> Result<RecordBatch> {
419 let columns_expanded = input_batch
421 .iter()
422 .enumerate()
423 .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
424 Some(_) => match column_data.data_type() {
425 DataType::Struct(_) => {
426 let struct_arr =
427 column_data.as_any().downcast_ref::<StructArray>().unwrap();
428 Ok(struct_arr.columns().to_vec())
429 }
430 data_type => internal_err!(
431 "expecting column {idx} from input plan to be a struct, got {data_type}"
432 ),
433 },
434 None => Ok(vec![Arc::clone(column_data)]),
435 })
436 .collect::<Result<Vec<_>>>()?
437 .into_iter()
438 .flatten()
439 .collect();
440 Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
441}
442
443#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
444pub struct ListUnnest {
445 pub index_in_input_schema: usize,
446 pub depth: usize,
447}
448
449fn list_unnest_at_level(
488 batch: &[ArrayRef],
489 list_type_unnests: &[ListUnnest],
490 temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
491 level_to_unnest: usize,
492 options: &UnnestOptions,
493) -> Result<Option<Vec<ArrayRef>>> {
494 let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
496 list_type_unnests
497 .iter()
498 .filter_map(|unnesting| {
499 if level_to_unnest == unnesting.depth {
500 return Some((
501 Arc::clone(&batch[unnesting.index_in_input_schema]),
502 *unnesting,
503 ));
504 }
505 if level_to_unnest < unnesting.depth {
508 return Some((
509 Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
510 *unnesting,
511 ));
512 }
513 None
514 })
515 .unzip();
516
517 let longest_length = find_longest_length(&arrs_to_unnest, options)?;
520 let unnested_length = longest_length.as_primitive::<Int64Type>();
521 let total_length = if unnested_length.is_empty() {
522 0
523 } else {
524 sum(unnested_length).ok_or_else(|| {
525 exec_datafusion_err!("Failed to calculate the total unnested length")
526 })? as usize
527 };
528 if total_length == 0 {
529 return Ok(None);
530 }
531
532 let unnested_temp_arrays =
534 unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
535
536 let take_indices = create_take_indices(unnested_length, total_length);
538 unnested_temp_arrays
539 .into_iter()
540 .zip(list_unnest_specs.iter())
541 .for_each(|(flatten_arr, unnesting)| {
542 temp_unnested_arrs.insert(*unnesting, flatten_arr);
543 });
544
545 let repeat_mask: Vec<bool> = batch
546 .iter()
547 .enumerate()
548 .map(|(i, _)| {
549 let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
551 unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
552 });
553
554 let is_involved_in_unnesting = list_type_unnests
556 .iter()
557 .any(|unnesting| unnesting.index_in_input_schema == i);
558
559 needed_in_future_levels || !is_involved_in_unnesting
561 })
562 .collect();
563
564 let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
567
568 Ok(Some(ret))
569}
570struct UnnestingResult {
571 arr: ArrayRef,
572 depth: usize,
573}
574
575fn build_batch(
632 batch: &RecordBatch,
633 schema: &SchemaRef,
634 list_type_columns: &[ListUnnest],
635 struct_column_indices: &HashSet<usize>,
636 options: &UnnestOptions,
637) -> Result<Option<RecordBatch>> {
638 let transformed = match list_type_columns.len() {
639 0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
640 _ => {
641 let mut temp_unnested_result = HashMap::new();
642 let max_recursion = list_type_columns
643 .iter()
644 .fold(0, |highest_depth, ListUnnest { depth, .. }| {
645 cmp::max(highest_depth, *depth)
646 });
647
648 let mut flatten_arrs = vec![];
650
651 for depth in (1..=max_recursion).rev() {
654 let input = match depth == max_recursion {
655 true => batch.columns(),
656 false => &flatten_arrs,
657 };
658 let Some(temp_result) = list_unnest_at_level(
659 input,
660 list_type_columns,
661 &mut temp_unnested_result,
662 depth,
663 options,
664 )?
665 else {
666 return Ok(None);
667 };
668 flatten_arrs = temp_result;
669 }
670 let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
671 temp_unnested_result.into_iter().fold(
672 HashMap::new(),
673 |mut acc,
674 (
675 ListUnnest {
676 index_in_input_schema,
677 depth,
678 },
679 flattened_array,
680 )| {
681 acc.entry(index_in_input_schema).or_default().push(
682 UnnestingResult {
683 arr: flattened_array,
684 depth,
685 },
686 );
687 acc
688 },
689 );
690 let output_order: HashMap<ListUnnest, usize> = list_type_columns
691 .iter()
692 .enumerate()
693 .map(|(order, unnest_def)| (*unnest_def, order))
694 .collect();
695
696 let mut multi_unnested_per_original_index = unnested_array_map
698 .into_iter()
699 .map(
700 |(original_index, mut unnested_columns)| {
704 unnested_columns.sort_by(
705 |UnnestingResult { depth: depth1, .. },
706 UnnestingResult { depth: depth2, .. }|
707 -> Ordering {
708 output_order
709 .get(&ListUnnest {
710 depth: *depth1,
711 index_in_input_schema: original_index,
712 })
713 .unwrap()
714 .cmp(
715 output_order
716 .get(&ListUnnest {
717 depth: *depth2,
718 index_in_input_schema: original_index,
719 })
720 .unwrap(),
721 )
722 },
723 );
724 (
725 original_index,
726 unnested_columns
727 .into_iter()
728 .map(|result| result.arr)
729 .collect::<Vec<_>>(),
730 )
731 },
732 )
733 .collect::<HashMap<_, _>>();
734
735 let ret = flatten_arrs
736 .into_iter()
737 .enumerate()
738 .flat_map(|(col_idx, arr)| {
739 match multi_unnested_per_original_index.remove(&col_idx) {
743 Some(unnested_arrays) => unnested_arrays,
744 None => vec![arr],
745 }
746 })
747 .collect::<Vec<_>>();
748
749 flatten_struct_cols(&ret, schema, struct_column_indices)
750 }
751 }?;
752 Ok(Some(transformed))
753}
754
755fn find_longest_length(
777 list_arrays: &[ArrayRef],
778 options: &UnnestOptions,
779) -> Result<ArrayRef> {
780 let null_length = if options.preserve_nulls {
782 Scalar::new(Int64Array::from_value(1, 1))
783 } else {
784 Scalar::new(Int64Array::from_value(0, 1))
785 };
786 let list_lengths: Vec<ArrayRef> = list_arrays
787 .iter()
788 .map(|list_array| {
789 let mut length_array = length(list_array)?;
790 length_array = cast(&length_array, &DataType::Int64)?;
792 length_array =
793 zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
794 Ok(length_array)
795 })
796 .collect::<Result<_>>()?;
797
798 let longest_length = list_lengths.iter().skip(1).try_fold(
799 Arc::clone(&list_lengths[0]),
800 |longest, current| {
801 let is_lt = lt(&longest, ¤t)?;
802 zip(&is_lt, ¤t, &longest)
803 },
804 )?;
805 Ok(longest_length)
806}
807
808trait ListArrayType: Array {
810 fn values(&self) -> &ArrayRef;
812
813 fn value_offsets(&self, row: usize) -> (i64, i64);
815}
816
817impl ListArrayType for ListArray {
818 fn values(&self) -> &ArrayRef {
819 self.values()
820 }
821
822 fn value_offsets(&self, row: usize) -> (i64, i64) {
823 let offsets = self.value_offsets();
824 (offsets[row].into(), offsets[row + 1].into())
825 }
826}
827
828impl ListArrayType for LargeListArray {
829 fn values(&self) -> &ArrayRef {
830 self.values()
831 }
832
833 fn value_offsets(&self, row: usize) -> (i64, i64) {
834 let offsets = self.value_offsets();
835 (offsets[row], offsets[row + 1])
836 }
837}
838
839impl ListArrayType for FixedSizeListArray {
840 fn values(&self) -> &ArrayRef {
841 self.values()
842 }
843
844 fn value_offsets(&self, row: usize) -> (i64, i64) {
845 let start = self.value_offset(row) as i64;
846 (start, start + self.value_length() as i64)
847 }
848}
849
850impl ListArrayType for ListViewArray {
851 fn values(&self) -> &ArrayRef {
852 self.values()
853 }
854
855 fn value_offsets(&self, row: usize) -> (i64, i64) {
856 let offset = self.value_offsets()[row] as i64;
857 let size = self.value_sizes()[row] as i64;
858 (offset, offset + size)
859 }
860}
861
862impl ListArrayType for LargeListViewArray {
863 fn values(&self) -> &ArrayRef {
864 self.values()
865 }
866
867 fn value_offsets(&self, row: usize) -> (i64, i64) {
868 let offset = self.value_offsets()[row];
869 let size = self.value_sizes()[row];
870 (offset, offset + size)
871 }
872}
873
874fn unnest_list_arrays(
876 list_arrays: &[ArrayRef],
877 length_array: &PrimitiveArray<Int64Type>,
878 capacity: usize,
879) -> Result<Vec<ArrayRef>> {
880 let typed_arrays = list_arrays
881 .iter()
882 .map(|list_array| match list_array.data_type() {
883 DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
884 DataType::LargeList(_) => {
885 Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
886 }
887 DataType::FixedSizeList(_, _) => {
888 Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
889 }
890 DataType::ListView(_) => {
891 Ok(list_array.as_list_view::<i32>() as &dyn ListArrayType)
892 }
893 DataType::LargeListView(_) => {
894 Ok(list_array.as_list_view::<i64>() as &dyn ListArrayType)
895 }
896 other => exec_err!("Invalid unnest datatype {other }"),
897 })
898 .collect::<Result<Vec<_>>>()?;
899
900 typed_arrays
901 .iter()
902 .map(|list_array| unnest_list_array(*list_array, length_array, capacity))
903 .collect::<Result<_>>()
904}
905
906fn unnest_list_array(
927 list_array: &dyn ListArrayType,
928 length_array: &PrimitiveArray<Int64Type>,
929 capacity: usize,
930) -> Result<ArrayRef> {
931 let values = list_array.values();
932 let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
933 for row in 0..list_array.len() {
934 let mut value_length = 0;
935 if !list_array.is_null(row) {
936 let (start, end) = list_array.value_offsets(row);
937 value_length = end - start;
938 for i in start..end {
939 take_indices_builder.append_value(i)
940 }
941 }
942 let target_length = length_array.value(row);
943 debug_assert!(
944 value_length <= target_length,
945 "value length is beyond the longest length"
946 );
947 for _ in value_length..target_length {
949 take_indices_builder.append_null();
950 }
951 }
952 Ok(kernels::take::take(
953 &values,
954 &take_indices_builder.finish(),
955 None,
956 )?)
957}
958
959fn create_take_indices(
974 length_array: &PrimitiveArray<Int64Type>,
975 capacity: usize,
976) -> PrimitiveArray<Int64Type> {
977 debug_assert!(
979 length_array.null_count() == 0,
980 "length array should not contain nulls"
981 );
982 let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
983 for (index, repeat) in length_array.iter().enumerate() {
984 let repeat = repeat.unwrap();
986 (0..repeat).for_each(|_| builder.append_value(index as i64));
987 }
988 builder.finish()
989}
990
991fn repeat_arrs_from_indices(
1038 batch: &[ArrayRef],
1039 indices: &PrimitiveArray<Int64Type>,
1040 repeat_mask: &[bool],
1041) -> Result<Vec<Arc<dyn Array>>> {
1042 batch
1043 .iter()
1044 .zip(repeat_mask.iter())
1045 .map(|(arr, &repeat)| {
1046 if repeat {
1047 Ok(kernels::take::take(arr, indices, None)?)
1048 } else {
1049 Ok(new_null_array(arr.data_type(), arr.len()))
1050 }
1051 })
1052 .collect()
1053}
1054
1055#[cfg(test)]
1056mod tests {
1057 use super::*;
1058 use arrow::array::{
1059 GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
1060 };
1061 use arrow::buffer::{NullBuffer, OffsetBuffer};
1062 use arrow::datatypes::{Field, Int32Type};
1063 use datafusion_common::test_util::batches_to_string;
1064 use insta::assert_snapshot;
1065
1066 fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
1069 where
1070 OffsetSize: OffsetSizeTrait,
1071 {
1072 let mut values = vec![];
1073 let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
1074 let mut valid = NullBufferBuilder::new(6);
1075
1076 values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
1078 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1079 valid.append_non_null();
1080
1081 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1083 valid.append_non_null();
1084
1085 values.push(Some("?"));
1088 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1089 valid.append_null();
1090
1091 values.push(Some("D"));
1093 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1094 valid.append_non_null();
1095
1096 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1098 valid.append_null();
1099
1100 values.extend_from_slice(&[None, Some("F")]);
1102 offsets.push(OffsetSize::from_usize(values.len()).unwrap());
1103 valid.append_non_null();
1104
1105 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1106 GenericListArray::<OffsetSize>::new(
1107 field,
1108 OffsetBuffer::new(offsets.into()),
1109 Arc::new(StringArray::from(values)),
1110 valid.finish(),
1111 )
1112 }
1113
1114 fn make_fixed_list() -> FixedSizeListArray {
1117 let values = Arc::new(StringArray::from_iter([
1118 Some("A"),
1119 Some("B"),
1120 None,
1121 None,
1122 Some("C"),
1123 Some("D"),
1124 None,
1125 None,
1126 None,
1127 Some("F"),
1128 None,
1129 None,
1130 ]));
1131 let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
1132 let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
1133 FixedSizeListArray::new(field, 2, values, Some(valid))
1134 }
1135
1136 fn verify_unnest_list_array(
1137 list_array: &dyn ListArrayType,
1138 lengths: Vec<i64>,
1139 expected: Vec<Option<&str>>,
1140 ) -> Result<()> {
1141 let length_array = Int64Array::from(lengths);
1142 let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
1143 let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
1144 assert_eq!(strs, expected);
1145 Ok(())
1146 }
1147
1148 #[test]
1149 fn test_build_batch_list_arr_recursive() -> Result<()> {
1150 let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1155 Some(vec![Some(1), Some(2), Some(3)]),
1156 None,
1157 Some(vec![Some(4), Some(5)]),
1158 Some(vec![Some(7), Some(8), Some(9), Some(10)]),
1159 None,
1160 Some(vec![Some(11), Some(12), Some(13)]),
1161 ]);
1162
1163 let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
1164 let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
1165 let mut nulls = NullBufferBuilder::new(3);
1166 nulls.append_non_null();
1167 nulls.append_non_null();
1168 nulls.append_null();
1169 let col1_field = Field::new_list_field(
1171 DataType::List(Arc::new(Field::new_list_field(
1172 list_arr1_ref.data_type().to_owned(),
1173 true,
1174 ))),
1175 true,
1176 );
1177 let col1 = ListArray::new(
1178 Arc::new(Field::new_list_field(
1179 list_arr1_ref.data_type().to_owned(),
1180 true,
1181 )),
1182 offsets,
1183 list_arr1_ref,
1184 nulls.finish(),
1185 );
1186
1187 let list_arr2 = StringArray::from(vec![
1188 Some("a"),
1189 Some("b"),
1190 Some("c"),
1191 Some("d"),
1192 Some("e"),
1193 ]);
1194
1195 let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
1196 let mut nulls = NullBufferBuilder::new(3);
1197 nulls.append_n_non_nulls(3);
1198 let col2_field = Field::new(
1199 "col2",
1200 DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
1201 true,
1202 );
1203 let col2 = GenericListArray::<i32>::new(
1204 Arc::new(Field::new_list_field(DataType::Utf8, true)),
1205 OffsetBuffer::new(offsets.into()),
1206 Arc::new(list_arr2),
1207 nulls.finish(),
1208 );
1209 let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
1211 let out_schema = Arc::new(Schema::new(vec![
1212 Field::new(
1213 "col1_unnest_placeholder_depth_1",
1214 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1215 true,
1216 ),
1217 Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
1218 Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
1219 ]));
1220 let batch = RecordBatch::try_new(
1221 Arc::clone(&schema),
1222 vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
1223 )
1224 .unwrap();
1225 let list_type_columns = vec![
1226 ListUnnest {
1227 index_in_input_schema: 0,
1228 depth: 1,
1229 },
1230 ListUnnest {
1231 index_in_input_schema: 0,
1232 depth: 2,
1233 },
1234 ListUnnest {
1235 index_in_input_schema: 1,
1236 depth: 1,
1237 },
1238 ];
1239 let ret = build_batch(
1240 &batch,
1241 &out_schema,
1242 list_type_columns.as_ref(),
1243 &HashSet::default(),
1244 &UnnestOptions {
1245 preserve_nulls: true,
1246 recursions: vec![],
1247 },
1248 )?
1249 .unwrap();
1250
1251 assert_snapshot!(batches_to_string(&[ret]),
1252 @r"
1253 +---------------------------------+---------------------------------+---------------------------------+
1254 | col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |
1255 +---------------------------------+---------------------------------+---------------------------------+
1256 | [1, 2, 3] | 1 | a |
1257 | | 2 | b |
1258 | [4, 5] | 3 | |
1259 | [1, 2, 3] | | a |
1260 | | | b |
1261 | [4, 5] | | |
1262 | [1, 2, 3] | 4 | a |
1263 | | 5 | b |
1264 | [4, 5] | | |
1265 | [7, 8, 9, 10] | 7 | c |
1266 | | 8 | d |
1267 | [11, 12, 13] | 9 | |
1268 | | 10 | |
1269 | [7, 8, 9, 10] | | c |
1270 | | | d |
1271 | [11, 12, 13] | | |
1272 | [7, 8, 9, 10] | 11 | c |
1273 | | 12 | d |
1274 | [11, 12, 13] | 13 | |
1275 | | | e |
1276 +---------------------------------+---------------------------------+---------------------------------+
1277 ");
1278 Ok(())
1279 }
1280
1281 #[test]
1282 fn test_unnest_list_array() -> Result<()> {
1283 let list_array = make_generic_array::<i32>();
1285 verify_unnest_list_array(
1286 &list_array,
1287 vec![3, 2, 1, 2, 0, 3],
1288 vec![
1289 Some("A"),
1290 Some("B"),
1291 Some("C"),
1292 None,
1293 None,
1294 None,
1295 Some("D"),
1296 None,
1297 None,
1298 Some("F"),
1299 None,
1300 ],
1301 )?;
1302
1303 let list_array = make_fixed_list();
1305 verify_unnest_list_array(
1306 &list_array,
1307 vec![3, 1, 2, 0, 2, 3],
1308 vec![
1309 Some("A"),
1310 Some("B"),
1311 None,
1312 None,
1313 Some("C"),
1314 Some("D"),
1315 None,
1316 Some("F"),
1317 None,
1318 None,
1319 None,
1320 ],
1321 )?;
1322
1323 Ok(())
1324 }
1325
1326 fn verify_longest_length(
1327 list_arrays: &[ArrayRef],
1328 preserve_nulls: bool,
1329 expected: Vec<i64>,
1330 ) -> Result<()> {
1331 let options = UnnestOptions {
1332 preserve_nulls,
1333 recursions: vec![],
1334 };
1335 let longest_length = find_longest_length(list_arrays, &options)?;
1336 let expected_array = Int64Array::from(expected);
1337 assert_eq!(
1338 longest_length
1339 .as_any()
1340 .downcast_ref::<Int64Array>()
1341 .unwrap(),
1342 &expected_array
1343 );
1344 Ok(())
1345 }
1346
1347 #[test]
1348 fn test_longest_list_length() -> Result<()> {
1349 let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1352 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1353 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1354
1355 let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1358 verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
1359 verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
1360
1361 let list_array = Arc::new(make_fixed_list()) as ArrayRef;
1364 verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
1365 verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
1366
1367 let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1371 let list2 = Arc::new(make_fixed_list()) as ArrayRef;
1372 let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
1373 verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
1374 verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
1375
1376 Ok(())
1377 }
1378
1379 #[test]
1380 fn test_create_take_indices() -> Result<()> {
1381 let length_array = Int64Array::from(vec![2, 3, 1]);
1382 let take_indices = create_take_indices(&length_array, 6);
1383 let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
1384 assert_eq!(take_indices, expected);
1385 Ok(())
1386 }
1387}