use std::cmp::{self, Ordering};
use std::task::{Poll, ready};
use std::{any::Any, sync::Arc};
use super::metrics::{
self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
RecordOutput,
};
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
use crate::{
DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
SendableRecordBatchStream, check_if_same_properties,
};
use arrow::array::{
Array, ArrayRef, AsArray, BooleanBufferBuilder, FixedSizeListArray, Int64Array,
LargeListArray, ListArray, PrimitiveArray, Scalar, StructArray, new_null_array,
};
use arrow::compute::kernels::length::length;
use arrow::compute::kernels::zip::zip;
use arrow::compute::{cast, is_not_null, kernels, sum};
use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_ord::cmp::lt;
use async_trait::async_trait;
use datafusion_common::{
Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err,
internal_err,
};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::Column;
use futures::{Stream, StreamExt};
use log::trace;
#[derive(Debug, Clone)]
pub struct UnnestExec {
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
list_column_indices: Vec<ListUnnest>,
struct_column_indices: Vec<usize>,
options: UnnestOptions,
metrics: ExecutionPlanMetricsSet,
cache: Arc<PlanProperties>,
}
impl UnnestExec {
pub fn new(
input: Arc<dyn ExecutionPlan>,
list_column_indices: Vec<ListUnnest>,
struct_column_indices: Vec<usize>,
schema: SchemaRef,
options: UnnestOptions,
) -> Result<Self> {
let cache = Self::compute_properties(
&input,
&list_column_indices,
&struct_column_indices,
&schema,
)?;
Ok(UnnestExec {
input,
schema,
list_column_indices,
struct_column_indices,
options,
metrics: Default::default(),
cache: Arc::new(cache),
})
}
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
list_column_indices: &[ListUnnest],
struct_column_indices: &[usize],
schema: &SchemaRef,
) -> Result<PlanProperties> {
let input_schema = input.schema();
let mut unnested_indices = BooleanBufferBuilder::new(input_schema.fields().len());
unnested_indices.append_n(input_schema.fields().len(), false);
for list_unnest in list_column_indices {
unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
}
for struct_unnest in struct_column_indices {
unnested_indices.set_bit(*struct_unnest, true)
}
let unnested_indices = unnested_indices.finish();
let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
.filter(|idx| !unnested_indices.value(*idx))
.collect();
let input_schema = input.schema();
let projection_mapping: ProjectionMapping = non_unnested_indices
.iter()
.map(|&input_idx| {
let input_field = input_schema.field(input_idx);
let output_idx = schema
.fields()
.iter()
.position(|output_field| output_field.name() == input_field.name())
.ok_or_else(|| {
exec_datafusion_err!(
"Non-unnested column '{}' must exist in output schema",
input_field.name()
)
})?;
let input_col = Arc::new(Column::new(input_field.name(), input_idx))
as Arc<dyn PhysicalExpr>;
let target_col = Arc::new(Column::new(input_field.name(), output_idx))
as Arc<dyn PhysicalExpr>;
let targets = vec![(target_col, output_idx)].into();
Ok((input_col, targets))
})
.collect::<Result<ProjectionMapping>>()?;
let input_eq_properties = input.equivalence_properties();
let eq_properties = input_eq_properties
.project(&projection_mapping, Arc::clone(schema))
.with_constraints(Constraints::default());
let output_partitioning = input
.output_partitioning()
.project(&projection_mapping, &eq_properties);
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
input.pipeline_behavior(),
input.boundedness(),
))
}
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
pub fn list_column_indices(&self) -> &[ListUnnest] {
&self.list_column_indices
}
pub fn struct_column_indices(&self) -> &[usize] {
&self.struct_column_indices
}
pub fn options(&self) -> &UnnestOptions {
&self.options
}
fn with_new_children_and_same_properties(
&self,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Self {
Self {
input: children.swap_remove(0),
metrics: ExecutionPlanMetricsSet::new(),
..Self::clone(self)
}
}
}
impl DisplayAs for UnnestExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "UnnestExec")
}
DisplayFormatType::TreeRender => {
write!(f, "")
}
}
}
}
impl ExecutionPlan for UnnestExec {
fn name(&self) -> &'static str {
"UnnestExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
check_if_same_properties!(self, children);
Ok(Arc::new(UnnestExec::new(
children.swap_remove(0),
self.list_column_indices.clone(),
self.struct_column_indices.clone(),
Arc::clone(&self.schema),
self.options.clone(),
)?))
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::UnspecifiedDistribution]
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
let metrics = UnnestMetrics::new(partition, &self.metrics);
Ok(Box::pin(UnnestStream {
input,
schema: Arc::clone(&self.schema),
list_type_columns: self.list_column_indices.clone(),
struct_column_indices: self.struct_column_indices.iter().copied().collect(),
options: self.options.clone(),
metrics,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
#[derive(Clone, Debug)]
struct UnnestMetrics {
baseline_metrics: BaselineMetrics,
input_batches: metrics::Count,
input_rows: metrics::Count,
}
impl UnnestMetrics {
fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let input_batches =
MetricBuilder::new(metrics).counter("input_batches", partition);
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
Self {
baseline_metrics: BaselineMetrics::new(metrics, partition),
input_batches,
input_rows,
}
}
}
struct UnnestStream {
input: SendableRecordBatchStream,
schema: Arc<Schema>,
list_type_columns: Vec<ListUnnest>,
struct_column_indices: HashSet<usize>,
options: UnnestOptions,
metrics: UnnestMetrics,
}
impl RecordBatchStream for UnnestStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
#[async_trait]
impl Stream for UnnestStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
self.poll_next_impl(cx)
}
}
impl UnnestStream {
fn poll_next_impl(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
loop {
return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let elapsed_compute =
self.metrics.baseline_metrics.elapsed_compute().clone();
let timer = elapsed_compute.timer();
self.metrics.input_batches.add(1);
self.metrics.input_rows.add(batch.num_rows());
let result = build_batch(
&batch,
&self.schema,
&self.list_type_columns,
&self.struct_column_indices,
&self.options,
)?;
timer.done();
let Some(result_batch) = result else {
continue;
};
(&result_batch).record_output(&self.metrics.baseline_metrics);
debug_assert!(result_batch.num_rows() > 0);
Some(Ok(result_batch))
}
other => {
trace!(
"Processed {} probe-side input batches containing {} rows and \
produced {} output batches containing {} rows in {}",
self.metrics.input_batches,
self.metrics.input_rows,
self.metrics.baseline_metrics.output_batches(),
self.metrics.baseline_metrics.output_rows(),
self.metrics.baseline_metrics.elapsed_compute(),
);
other
}
});
}
}
}
fn flatten_struct_cols(
input_batch: &[Arc<dyn Array>],
schema: &SchemaRef,
struct_column_indices: &HashSet<usize>,
) -> Result<RecordBatch> {
let columns_expanded = input_batch
.iter()
.enumerate()
.map(|(idx, column_data)| match struct_column_indices.get(&idx) {
Some(_) => match column_data.data_type() {
DataType::Struct(_) => {
let struct_arr =
column_data.as_any().downcast_ref::<StructArray>().unwrap();
Ok(struct_arr.columns().to_vec())
}
data_type => internal_err!(
"expecting column {} from input plan to be a struct, got {:?}",
idx,
data_type
),
},
None => Ok(vec![Arc::clone(column_data)]),
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();
Ok(RecordBatch::try_new(Arc::clone(schema), columns_expanded)?)
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub struct ListUnnest {
pub index_in_input_schema: usize,
pub depth: usize,
}
fn list_unnest_at_level(
batch: &[ArrayRef],
list_type_unnests: &[ListUnnest],
temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
level_to_unnest: usize,
options: &UnnestOptions,
) -> Result<Option<Vec<ArrayRef>>> {
let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
list_type_unnests
.iter()
.filter_map(|unnesting| {
if level_to_unnest == unnesting.depth {
return Some((
Arc::clone(&batch[unnesting.index_in_input_schema]),
*unnesting,
));
}
if level_to_unnest < unnesting.depth {
return Some((
Arc::clone(temp_unnested_arrs.get(unnesting).unwrap()),
*unnesting,
));
}
None
})
.unzip();
let longest_length = find_longest_length(&arrs_to_unnest, options)?;
let unnested_length = longest_length.as_primitive::<Int64Type>();
let total_length = if unnested_length.is_empty() {
0
} else {
sum(unnested_length).ok_or_else(|| {
exec_datafusion_err!("Failed to calculate the total unnested length")
})? as usize
};
if total_length == 0 {
return Ok(None);
}
let unnested_temp_arrays =
unnest_list_arrays(arrs_to_unnest.as_ref(), unnested_length, total_length)?;
let take_indices = create_take_indices(unnested_length, total_length);
unnested_temp_arrays
.into_iter()
.zip(list_unnest_specs.iter())
.for_each(|(flatten_arr, unnesting)| {
temp_unnested_arrs.insert(*unnesting, flatten_arr);
});
let repeat_mask: Vec<bool> = batch
.iter()
.enumerate()
.map(|(i, _)| {
let needed_in_future_levels = list_type_unnests.iter().any(|unnesting| {
unnesting.index_in_input_schema == i && unnesting.depth < level_to_unnest
});
let is_involved_in_unnesting = list_type_unnests
.iter()
.any(|unnesting| unnesting.index_in_input_schema == i);
needed_in_future_levels || !is_involved_in_unnesting
})
.collect();
let ret = repeat_arrs_from_indices(batch, &take_indices, &repeat_mask)?;
Ok(Some(ret))
}
struct UnnestingResult {
arr: ArrayRef,
depth: usize,
}
fn build_batch(
batch: &RecordBatch,
schema: &SchemaRef,
list_type_columns: &[ListUnnest],
struct_column_indices: &HashSet<usize>,
options: &UnnestOptions,
) -> Result<Option<RecordBatch>> {
let transformed = match list_type_columns.len() {
0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
_ => {
let mut temp_unnested_result = HashMap::new();
let max_recursion = list_type_columns
.iter()
.fold(0, |highest_depth, ListUnnest { depth, .. }| {
cmp::max(highest_depth, *depth)
});
let mut flatten_arrs = vec![];
for depth in (1..=max_recursion).rev() {
let input = match depth == max_recursion {
true => batch.columns(),
false => &flatten_arrs,
};
let Some(temp_result) = list_unnest_at_level(
input,
list_type_columns,
&mut temp_unnested_result,
depth,
options,
)?
else {
return Ok(None);
};
flatten_arrs = temp_result;
}
let unnested_array_map: HashMap<usize, Vec<UnnestingResult>> =
temp_unnested_result.into_iter().fold(
HashMap::new(),
|mut acc,
(
ListUnnest {
index_in_input_schema,
depth,
},
flattened_array,
)| {
acc.entry(index_in_input_schema).or_default().push(
UnnestingResult {
arr: flattened_array,
depth,
},
);
acc
},
);
let output_order: HashMap<ListUnnest, usize> = list_type_columns
.iter()
.enumerate()
.map(|(order, unnest_def)| (*unnest_def, order))
.collect();
let mut multi_unnested_per_original_index = unnested_array_map
.into_iter()
.map(
|(original_index, mut unnested_columns)| {
unnested_columns.sort_by(
|UnnestingResult { depth: depth1, .. },
UnnestingResult { depth: depth2, .. }|
-> Ordering {
output_order
.get(&ListUnnest {
depth: *depth1,
index_in_input_schema: original_index,
})
.unwrap()
.cmp(
output_order
.get(&ListUnnest {
depth: *depth2,
index_in_input_schema: original_index,
})
.unwrap(),
)
},
);
(
original_index,
unnested_columns
.into_iter()
.map(|result| result.arr)
.collect::<Vec<_>>(),
)
},
)
.collect::<HashMap<_, _>>();
let ret = flatten_arrs
.into_iter()
.enumerate()
.flat_map(|(col_idx, arr)| {
match multi_unnested_per_original_index.remove(&col_idx) {
Some(unnested_arrays) => unnested_arrays,
None => vec![arr],
}
})
.collect::<Vec<_>>();
flatten_struct_cols(&ret, schema, struct_column_indices)
}
}?;
Ok(Some(transformed))
}
fn find_longest_length(
list_arrays: &[ArrayRef],
options: &UnnestOptions,
) -> Result<ArrayRef> {
let null_length = if options.preserve_nulls {
Scalar::new(Int64Array::from_value(1, 1))
} else {
Scalar::new(Int64Array::from_value(0, 1))
};
let list_lengths: Vec<ArrayRef> = list_arrays
.iter()
.map(|list_array| {
let mut length_array = length(list_array)?;
length_array = cast(&length_array, &DataType::Int64)?;
length_array =
zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
Ok(length_array)
})
.collect::<Result<_>>()?;
let longest_length = list_lengths.iter().skip(1).try_fold(
Arc::clone(&list_lengths[0]),
|longest, current| {
let is_lt = lt(&longest, ¤t)?;
zip(&is_lt, ¤t, &longest)
},
)?;
Ok(longest_length)
}
trait ListArrayType: Array {
fn values(&self) -> &ArrayRef;
fn value_offsets(&self, row: usize) -> (i64, i64);
}
impl ListArrayType for ListArray {
fn values(&self) -> &ArrayRef {
self.values()
}
fn value_offsets(&self, row: usize) -> (i64, i64) {
let offsets = self.value_offsets();
(offsets[row].into(), offsets[row + 1].into())
}
}
impl ListArrayType for LargeListArray {
fn values(&self) -> &ArrayRef {
self.values()
}
fn value_offsets(&self, row: usize) -> (i64, i64) {
let offsets = self.value_offsets();
(offsets[row], offsets[row + 1])
}
}
impl ListArrayType for FixedSizeListArray {
fn values(&self) -> &ArrayRef {
self.values()
}
fn value_offsets(&self, row: usize) -> (i64, i64) {
let start = self.value_offset(row) as i64;
(start, start + self.value_length() as i64)
}
}
fn unnest_list_arrays(
list_arrays: &[ArrayRef],
length_array: &PrimitiveArray<Int64Type>,
capacity: usize,
) -> Result<Vec<ArrayRef>> {
let typed_arrays = list_arrays
.iter()
.map(|list_array| match list_array.data_type() {
DataType::List(_) => Ok(list_array.as_list::<i32>() as &dyn ListArrayType),
DataType::LargeList(_) => {
Ok(list_array.as_list::<i64>() as &dyn ListArrayType)
}
DataType::FixedSizeList(_, _) => {
Ok(list_array.as_fixed_size_list() as &dyn ListArrayType)
}
other => exec_err!("Invalid unnest datatype {other }"),
})
.collect::<Result<Vec<_>>>()?;
typed_arrays
.iter()
.map(|list_array| unnest_list_array(*list_array, length_array, capacity))
.collect::<Result<_>>()
}
fn unnest_list_array(
list_array: &dyn ListArrayType,
length_array: &PrimitiveArray<Int64Type>,
capacity: usize,
) -> Result<ArrayRef> {
let values = list_array.values();
let mut take_indices_builder = PrimitiveArray::<Int64Type>::builder(capacity);
for row in 0..list_array.len() {
let mut value_length = 0;
if !list_array.is_null(row) {
let (start, end) = list_array.value_offsets(row);
value_length = end - start;
for i in start..end {
take_indices_builder.append_value(i)
}
}
let target_length = length_array.value(row);
debug_assert!(
value_length <= target_length,
"value length is beyond the longest length"
);
for _ in value_length..target_length {
take_indices_builder.append_null();
}
}
Ok(kernels::take::take(
&values,
&take_indices_builder.finish(),
None,
)?)
}
fn create_take_indices(
length_array: &PrimitiveArray<Int64Type>,
capacity: usize,
) -> PrimitiveArray<Int64Type> {
debug_assert!(
length_array.null_count() == 0,
"length array should not contain nulls"
);
let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
for (index, repeat) in length_array.iter().enumerate() {
let repeat = repeat.unwrap();
(0..repeat).for_each(|_| builder.append_value(index as i64));
}
builder.finish()
}
fn repeat_arrs_from_indices(
batch: &[ArrayRef],
indices: &PrimitiveArray<Int64Type>,
repeat_mask: &[bool],
) -> Result<Vec<Arc<dyn Array>>> {
batch
.iter()
.zip(repeat_mask.iter())
.map(|(arr, &repeat)| {
if repeat {
Ok(kernels::take::take(arr, indices, None)?)
} else {
Ok(new_null_array(arr.data_type(), arr.len()))
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{
GenericListArray, NullBufferBuilder, OffsetSizeTrait, StringArray,
};
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::{Field, Int32Type};
use datafusion_common::test_util::batches_to_string;
use insta::assert_snapshot;
fn make_generic_array<OffsetSize>() -> GenericListArray<OffsetSize>
where
OffsetSize: OffsetSizeTrait,
{
let mut values = vec![];
let mut offsets: Vec<OffsetSize> = vec![OffsetSize::zero()];
let mut valid = NullBufferBuilder::new(6);
values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]);
offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append_non_null();
offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append_non_null();
values.push(Some("?"));
offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append_null();
values.push(Some("D"));
offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append_non_null();
offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append_null();
values.extend_from_slice(&[None, Some("F")]);
offsets.push(OffsetSize::from_usize(values.len()).unwrap());
valid.append_non_null();
let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
GenericListArray::<OffsetSize>::new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(StringArray::from(values)),
valid.finish(),
)
}
fn make_fixed_list() -> FixedSizeListArray {
let values = Arc::new(StringArray::from_iter([
Some("A"),
Some("B"),
None,
None,
Some("C"),
Some("D"),
None,
None,
None,
Some("F"),
None,
None,
]));
let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
let valid = NullBuffer::from(vec![true, false, true, false, true, true]);
FixedSizeListArray::new(field, 2, values, Some(valid))
}
fn verify_unnest_list_array(
list_array: &dyn ListArrayType,
lengths: Vec<i64>,
expected: Vec<Option<&str>>,
) -> Result<()> {
let length_array = Int64Array::from(lengths);
let unnested_array = unnest_list_array(list_array, &length_array, 3 * 6)?;
let strs = unnested_array.as_string::<i32>().iter().collect::<Vec<_>>();
assert_eq!(strs, expected);
Ok(())
}
#[test]
fn test_build_batch_list_arr_recursive() -> Result<()> {
let list_arr1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2), Some(3)]),
None,
Some(vec![Some(4), Some(5)]),
Some(vec![Some(7), Some(8), Some(9), Some(10)]),
None,
Some(vec![Some(11), Some(12), Some(13)]),
]);
let list_arr1_ref = Arc::new(list_arr1) as ArrayRef;
let offsets = OffsetBuffer::from_lengths([3, 3, 0]);
let mut nulls = NullBufferBuilder::new(3);
nulls.append_non_null();
nulls.append_non_null();
nulls.append_null();
let col1_field = Field::new_list_field(
DataType::List(Arc::new(Field::new_list_field(
list_arr1_ref.data_type().to_owned(),
true,
))),
true,
);
let col1 = ListArray::new(
Arc::new(Field::new_list_field(
list_arr1_ref.data_type().to_owned(),
true,
)),
offsets,
list_arr1_ref,
nulls.finish(),
);
let list_arr2 = StringArray::from(vec![
Some("a"),
Some("b"),
Some("c"),
Some("d"),
Some("e"),
]);
let offsets = OffsetBuffer::from_lengths([2, 2, 1]);
let mut nulls = NullBufferBuilder::new(3);
nulls.append_n_non_nulls(3);
let col2_field = Field::new(
"col2",
DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
true,
);
let col2 = GenericListArray::<i32>::new(
Arc::new(Field::new_list_field(DataType::Utf8, true)),
OffsetBuffer::new(offsets.into()),
Arc::new(list_arr2),
nulls.finish(),
);
let schema = Arc::new(Schema::new(vec![col1_field, col2_field]));
let out_schema = Arc::new(Schema::new(vec![
Field::new(
"col1_unnest_placeholder_depth_1",
DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
true,
),
Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true),
Field::new("col2_unnest_placeholder_depth_1", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef],
)
.unwrap();
let list_type_columns = vec![
ListUnnest {
index_in_input_schema: 0,
depth: 1,
},
ListUnnest {
index_in_input_schema: 0,
depth: 2,
},
ListUnnest {
index_in_input_schema: 1,
depth: 1,
},
];
let ret = build_batch(
&batch,
&out_schema,
list_type_columns.as_ref(),
&HashSet::default(),
&UnnestOptions {
preserve_nulls: true,
recursions: vec![],
},
)?
.unwrap();
assert_snapshot!(batches_to_string(&[ret]),
@r"
+---------------------------------+---------------------------------+---------------------------------+
| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |
+---------------------------------+---------------------------------+---------------------------------+
| [1, 2, 3] | 1 | a |
| | 2 | b |
| [4, 5] | 3 | |
| [1, 2, 3] | | a |
| | | b |
| [4, 5] | | |
| [1, 2, 3] | 4 | a |
| | 5 | b |
| [4, 5] | | |
| [7, 8, 9, 10] | 7 | c |
| | 8 | d |
| [11, 12, 13] | 9 | |
| | 10 | |
| [7, 8, 9, 10] | | c |
| | | d |
| [11, 12, 13] | | |
| [7, 8, 9, 10] | 11 | c |
| | 12 | d |
| [11, 12, 13] | 13 | |
| | | e |
+---------------------------------+---------------------------------+---------------------------------+
");
Ok(())
}
#[test]
fn test_unnest_list_array() -> Result<()> {
let list_array = make_generic_array::<i32>();
verify_unnest_list_array(
&list_array,
vec![3, 2, 1, 2, 0, 3],
vec![
Some("A"),
Some("B"),
Some("C"),
None,
None,
None,
Some("D"),
None,
None,
Some("F"),
None,
],
)?;
let list_array = make_fixed_list();
verify_unnest_list_array(
&list_array,
vec![3, 1, 2, 0, 2, 3],
vec![
Some("A"),
Some("B"),
None,
None,
Some("C"),
Some("D"),
None,
Some("F"),
None,
None,
None,
],
)?;
Ok(())
}
fn verify_longest_length(
list_arrays: &[ArrayRef],
preserve_nulls: bool,
expected: Vec<i64>,
) -> Result<()> {
let options = UnnestOptions {
preserve_nulls,
recursions: vec![],
};
let longest_length = find_longest_length(list_arrays, &options)?;
let expected_array = Int64Array::from(expected);
assert_eq!(
longest_length
.as_any()
.downcast_ref::<Int64Array>()
.unwrap(),
&expected_array
);
Ok(())
}
#[test]
fn test_longest_list_length() -> Result<()> {
let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
verify_longest_length(&[Arc::clone(&list_array)], false, vec![3, 0, 0, 1, 0, 2])?;
verify_longest_length(&[Arc::clone(&list_array)], true, vec![3, 0, 1, 1, 1, 2])?;
let list_array = Arc::new(make_fixed_list()) as ArrayRef;
verify_longest_length(&[Arc::clone(&list_array)], false, vec![2, 0, 2, 0, 2, 2])?;
verify_longest_length(&[Arc::clone(&list_array)], true, vec![2, 1, 2, 1, 2, 2])?;
let list1 = Arc::new(make_generic_array::<i32>()) as ArrayRef;
let list2 = Arc::new(make_fixed_list()) as ArrayRef;
let list_arrays = vec![Arc::clone(&list1), Arc::clone(&list2)];
verify_longest_length(&list_arrays, false, vec![3, 0, 2, 1, 2, 2])?;
verify_longest_length(&list_arrays, true, vec![3, 1, 2, 1, 2, 2])?;
Ok(())
}
#[test]
fn test_create_take_indices() -> Result<()> {
let length_array = Int64Array::from(vec![2, 3, 1]);
let take_indices = create_take_indices(&length_array, 6);
let expected = Int64Array::from(vec![0, 0, 1, 1, 1, 2]);
assert_eq!(take_indices, expected);
Ok(())
}
}