use std::any::Any;
use std::sync::Arc;
use crate::coop::cooperative;
use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
use crate::memory::MemoryStream;
use crate::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics, common,
};
use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion_common::{Result, assert_or_internal_err};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
use log::trace;
#[derive(Debug, Clone)]
pub struct PlaceholderRowExec {
schema: SchemaRef,
partitions: usize,
cache: Arc<PlanProperties>,
}
impl PlaceholderRowExec {
pub fn new(schema: SchemaRef) -> Self {
let partitions = 1;
let cache = Self::compute_properties(Arc::clone(&schema), partitions);
PlaceholderRowExec {
schema,
partitions,
cache: Arc::new(cache),
}
}
pub fn with_partitions(mut self, partitions: usize) -> Self {
self.partitions = partitions;
let output_partitioning = Self::output_partitioning_helper(self.partitions);
Arc::make_mut(&mut self.cache).partitioning = output_partitioning;
self
}
fn data(&self) -> Result<Vec<RecordBatch>> {
Ok({
let n_field = self.schema.fields.len();
vec![RecordBatch::try_new_with_options(
Arc::new(Schema::new(
(0..n_field)
.map(|i| {
Field::new(format!("placeholder_{i}"), DataType::Null, true)
})
.collect::<Fields>(),
)),
(0..n_field)
.map(|_i| {
let ret: ArrayRef = Arc::new(NullArray::new(1));
ret
})
.collect(),
&RecordBatchOptions::new().with_row_count(Some(1)),
)?]
})
}
fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
Partitioning::UnknownPartitioning(n_partitions)
}
fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Self::output_partitioning_helper(n_partitions),
EmissionType::Incremental,
Boundedness::Bounded,
)
.with_scheduling_type(SchedulingType::Cooperative)
}
}
impl DisplayAs for PlaceholderRowExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "PlaceholderRowExec")
}
DisplayFormatType::TreeRender => Ok(()),
}
}
}
impl ExecutionPlan for PlaceholderRowExec {
fn name(&self) -> &'static str {
"PlaceholderRowExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
trace!(
"Start PlaceholderRowExec::execute for partition {} of context session_id {} and task_id {:?}",
partition,
context.session_id(),
context.task_id()
);
assert_or_internal_err!(
partition < self.partitions,
"PlaceholderRowExec invalid partition {partition} (expected less than {})",
self.partitions
);
let ms = MemoryStream::try_new(self.data()?, Arc::clone(&self.schema), None)?;
Ok(Box::pin(cooperative(ms)))
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
let batches = self
.data()
.expect("Create single row placeholder RecordBatch should not fail");
let batches = match partition {
Some(_) => vec![batches],
None => vec![batches; self.partitions],
};
Ok(common::compute_record_batch_statistics(
&batches,
&self.schema,
None,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test;
use crate::with_new_children_if_necessary;
#[test]
fn with_new_children() -> Result<()> {
let schema = test::aggr_test_schema();
let placeholder = Arc::new(PlaceholderRowExec::new(schema));
let placeholder_2 = with_new_children_if_necessary(
Arc::clone(&placeholder) as Arc<dyn ExecutionPlan>,
vec![],
)?;
assert_eq!(placeholder.schema(), placeholder_2.schema());
let too_many_kids = vec![placeholder_2];
assert!(
with_new_children_if_necessary(placeholder, too_many_kids).is_err(),
"expected error when providing list of kids"
);
Ok(())
}
#[tokio::test]
async fn invalid_execute() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let placeholder = PlaceholderRowExec::new(schema);
assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err());
assert!(placeholder.execute(20, task_ctx).is_err());
Ok(())
}
#[tokio::test]
async fn produce_one_row() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let placeholder = PlaceholderRowExec::new(schema);
let iter = placeholder.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
assert_eq!(batches.len(), 1);
Ok(())
}
#[tokio::test]
async fn produce_one_row_multiple_partition() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let partitions = 3;
let placeholder = PlaceholderRowExec::new(schema).with_partitions(partitions);
for n in 0..partitions {
let iter = placeholder.execute(n, Arc::clone(&task_ctx))?;
let batches = common::collect(iter).await?;
assert_eq!(batches.len(), 1);
}
Ok(())
}
}