use arrow::array::{
new_null_array, Array, ArrayAccessor, ArrayRef, FixedSizeListArray, LargeListArray,
ListArray,
};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_execution::TaskContext;
use futures::Stream;
use futures::StreamExt;
use log::trace;
use std::time::Instant;
use std::{any::Any, sync::Arc};
use crate::physical_plan::{
coalesce_batches::concat_batches, expressions::Column, DisplayFormatType,
Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, PhysicalExpr,
PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datafusion_common::{DataFusionError, Result, ScalarValue};
#[derive(Debug)]
pub struct UnnestExec {
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
column: Column,
}
impl UnnestExec {
pub fn new(input: Arc<dyn ExecutionPlan>, column: Column, schema: SchemaRef) -> Self {
UnnestExec {
input,
schema,
column,
}
}
}
impl ExecutionPlan for UnnestExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(UnnestExec::new(
children[0].clone(),
self.column.clone(),
self.schema.clone(),
)))
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::UnspecifiedDistribution]
}
fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn equivalence_properties(&self) -> EquivalenceProperties {
self.input.equivalence_properties()
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
Ok(Box::pin(UnnestStream {
input,
schema: self.schema.clone(),
column: self.column.clone(),
num_input_batches: 0,
num_input_rows: 0,
num_output_batches: 0,
num_output_rows: 0,
unnest_time: 0,
}))
}
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "UnnestExec")
}
}
}
fn statistics(&self) -> Statistics {
Default::default()
}
}
struct UnnestStream {
input: SendableRecordBatchStream,
schema: Arc<Schema>,
column: Column,
num_input_batches: usize,
num_input_rows: usize,
num_output_batches: usize,
num_output_rows: usize,
unnest_time: usize,
}
impl RecordBatchStream for UnnestStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
#[async_trait]
impl Stream for UnnestStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.poll_next_impl(cx)
}
}
impl UnnestStream {
fn poll_next_impl(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<RecordBatch>>> {
self.input
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
let start = Instant::now();
let result = build_batch(&batch, &self.schema, &self.column);
self.num_input_batches += 1;
self.num_input_rows += batch.num_rows();
if let Ok(ref batch) = result {
self.unnest_time += start.elapsed().as_millis() as usize;
self.num_output_batches += 1;
self.num_output_rows += batch.num_rows();
}
Some(result)
}
other => {
trace!(
"Processed {} probe-side input batches containing {} rows and \
produced {} output batches containing {} rows in {} ms",
self.num_input_batches,
self.num_input_rows,
self.num_output_batches,
self.num_output_rows,
self.unnest_time,
);
other
}
})
}
}
fn build_batch(
batch: &RecordBatch,
schema: &SchemaRef,
column: &Column,
) -> Result<RecordBatch> {
let list_array = column.evaluate(batch)?.into_array(batch.num_rows());
match list_array.data_type() {
arrow::datatypes::DataType::List(_) => {
let list_array = list_array.as_any().downcast_ref::<ListArray>().unwrap();
unnest_batch(batch, schema, column, &list_array)
}
arrow::datatypes::DataType::LargeList(_) => {
let list_array = list_array
.as_any()
.downcast_ref::<LargeListArray>()
.unwrap();
unnest_batch(batch, schema, column, &list_array)
}
arrow::datatypes::DataType::FixedSizeList(_, _) => {
let list_array = list_array
.as_any()
.downcast_ref::<FixedSizeListArray>()
.unwrap();
unnest_batch(batch, schema, column, list_array)
}
_ => Err(DataFusionError::Execution(format!(
"Invalid unnest column {column}"
))),
}
}
fn unnest_batch<T>(
batch: &RecordBatch,
schema: &SchemaRef,
column: &Column,
list_array: &T,
) -> Result<RecordBatch>
where
T: ArrayAccessor<Item = ArrayRef>,
{
let mut batches = Vec::new();
let mut num_rows = 0;
for row in 0..batch.num_rows() {
let arrays = batch
.columns()
.iter()
.enumerate()
.map(|(col_idx, arr)| {
if col_idx == column.index() {
if list_array.value(row).is_empty() {
Ok(new_null_array(list_array.value(row).data_type(), 1))
} else {
Ok(list_array.value(row))
}
} else {
let nested_len = list_array.value(row).len().max(1);
if arr.is_null(row) {
Ok(new_null_array(arr.data_type(), nested_len))
} else {
let scalar = ScalarValue::try_from_array(arr, row)?;
Ok(scalar.to_array_of_size(nested_len))
}
}
})
.collect::<Result<Vec<_>>>()?;
let rb = RecordBatch::try_new(schema.clone(), arrays.to_vec())?;
num_rows += rb.num_rows();
batches.push(rb);
}
concat_batches(schema, &batches, num_rows).map_err(Into::into)
}