pub use self::metrics::Metric;
use self::metrics::MetricsSet;
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::{error::Result, scalar::ScalarValue};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
pub use datafusion_expr::Accumulator;
pub use datafusion_expr::ColumnarValue;
pub use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
pub use display::DisplayFormatType;
use futures::stream::Stream;
use std::fmt;
use std::fmt::Debug;
use datafusion_common::DataFusionError;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, pin::Pin};
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
fn schema(&self) -> SchemaRef;
}
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
pub struct EmptyRecordBatchStream {
schema: SchemaRef,
}
impl EmptyRecordBatchStream {
pub fn new(schema: SchemaRef) -> Self {
Self { schema }
}
}
impl RecordBatchStream for EmptyRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl Stream for EmptyRecordBatchStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}
pub use self::planner::PhysicalPlanner;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Statistics {
pub num_rows: Option<usize>,
pub total_byte_size: Option<usize>,
pub column_statistics: Option<Vec<ColumnStatistics>>,
pub is_exact: bool,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ColumnStatistics {
pub null_count: Option<usize>,
pub max_value: Option<ScalarValue>,
pub min_value: Option<ScalarValue>,
pub distinct_count: Option<usize>,
}
pub trait ExecutionPlan: Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn schema(&self) -> SchemaRef;
fn output_partitioning(&self) -> Partitioning;
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}
fn relies_on_input_order(&self) -> bool {
true
}
fn maintains_input_order(&self) -> bool {
false
}
fn benefits_from_input_partitioning(&self) -> bool {
!matches!(
self.required_child_distribution(),
Distribution::SinglePartition
)
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>;
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
fn metrics(&self) -> Option<MetricsSet> {
None
}
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ExecutionPlan(PlaceHolder)")
}
fn statistics(&self) -> Statistics;
}
#[allow(clippy::vtable_address_comparisons)]
pub fn with_new_children_if_necessary(
plan: Arc<dyn ExecutionPlan>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.len() != plan.children().len() {
Err(DataFusionError::Internal(
"Wrong number of children".to_string(),
))
} else if children.is_empty()
|| children
.iter()
.zip(plan.children().iter())
.any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
{
plan.with_new_children(children)
} else {
Ok(plan)
}
}
pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
DisplayableExecutionPlan::new(plan)
}
pub fn accept<V: ExecutionPlanVisitor>(
plan: &dyn ExecutionPlan,
visitor: &mut V,
) -> std::result::Result<(), V::Error> {
visitor.pre_visit(plan)?;
for child in plan.children() {
visit_execution_plan(child.as_ref(), visitor)?;
}
visitor.post_visit(plan)?;
Ok(())
}
pub trait ExecutionPlanVisitor {
type Error;
fn pre_visit(
&mut self,
plan: &dyn ExecutionPlan,
) -> std::result::Result<bool, Self::Error>;
fn post_visit(
&mut self,
_plan: &dyn ExecutionPlan,
) -> std::result::Result<bool, Self::Error> {
Ok(true)
}
}
pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
plan: &dyn ExecutionPlan,
visitor: &mut V,
) -> std::result::Result<(), V::Error> {
visitor.pre_visit(plan)?;
for child in plan.children() {
visit_execution_plan(child.as_ref(), visitor)?;
}
visitor.post_visit(plan)?;
Ok(())
}
pub async fn collect(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<Vec<RecordBatch>> {
let stream = execute_stream(plan, context).await?;
common::collect(stream).await
}
pub async fn execute_stream(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => plan.execute(0, context),
_ => {
let plan = CoalescePartitionsExec::new(plan.clone());
assert_eq!(1, plan.output_partitioning().partition_count());
plan.execute(0, context)
}
}
}
pub async fn collect_partitioned(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<Vec<Vec<RecordBatch>>> {
let streams = execute_stream_partitioned(plan, context).await?;
let mut batches = Vec::with_capacity(streams.len());
for stream in streams {
batches.push(common::collect(stream).await?);
}
Ok(batches)
}
pub async fn execute_stream_partitioned(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<Vec<SendableRecordBatchStream>> {
let num_partitions = plan.output_partitioning().partition_count();
let mut streams = Vec::with_capacity(num_partitions);
for i in 0..num_partitions {
streams.push(plan.execute(i, context.clone())?);
}
Ok(streams)
}
#[derive(Debug, Clone)]
pub enum Partitioning {
RoundRobinBatch(usize),
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
UnknownPartitioning(usize),
}
impl Partitioning {
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
}
}
}
#[derive(Debug, Clone)]
pub enum Distribution {
UnspecifiedDistribution,
SinglePartition,
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}
pub use datafusion_physical_expr::window::WindowExpr;
pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
pub fn project_schema(
schema: &SchemaRef,
projection: Option<&Vec<usize>>,
) -> Result<SchemaRef> {
let schema = match projection {
Some(columns) => Arc::new(schema.project(columns)?),
None => Arc::clone(schema),
};
Ok(schema)
}
pub mod aggregates;
pub mod analyze;
pub mod coalesce_batches;
pub mod coalesce_partitions;
pub mod common;
pub mod cross_join;
pub mod display;
pub mod empty;
pub mod explain;
pub mod file_format;
pub mod filter;
pub mod hash_join;
pub mod hash_utils;
pub mod join_utils;
pub mod limit;
pub mod memory;
pub mod metrics;
pub mod planner;
pub mod projection;
pub mod repartition;
pub mod sort_merge_join;
pub mod sorts;
pub mod stream;
pub mod udaf;
pub mod union;
pub mod values;
pub mod windows;
use crate::execution::context::TaskContext;
pub use datafusion_physical_expr::{expressions, functions, type_coercion, udf};