pub use self::metrics::Metric;
use self::metrics::MetricsSet;
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
pub use crate::common::{ColumnStatistics, Statistics};
use crate::error::Result;
use crate::physical_plan::expressions::PhysicalSortExpr;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
pub use datafusion_expr::Accumulator;
pub use datafusion_expr::ColumnarValue;
pub use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
pub use display::DisplayFormatType;
use futures::stream::Stream;
use std::fmt;
use std::fmt::Debug;
use datafusion_common::DataFusionError;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, pin::Pin};
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
fn schema(&self) -> SchemaRef;
}
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
pub struct EmptyRecordBatchStream {
schema: SchemaRef,
}
impl EmptyRecordBatchStream {
pub fn new(schema: SchemaRef) -> Self {
Self { schema }
}
}
impl RecordBatchStream for EmptyRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl Stream for EmptyRecordBatchStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}
pub use self::planner::PhysicalPlanner;
pub trait ExecutionPlan: Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn schema(&self) -> SchemaRef;
fn output_partitioning(&self) -> Partitioning;
fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Ok(false)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::UnspecifiedDistribution; self.children().len()]
}
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![None; self.children().len()]
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![false; self.children().len()]
}
fn benefits_from_input_partitioning(&self) -> bool {
!self
.required_input_distribution()
.into_iter()
.any(|dist| matches!(dist, Distribution::SinglePartition))
}
fn equivalence_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new(self.schema())
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>;
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
fn metrics(&self) -> Option<MetricsSet> {
None
}
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ExecutionPlan(PlaceHolder)")
}
fn statistics(&self) -> Statistics;
}
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
if let Some(repart) = plan.as_any().downcast_ref::<RepartitionExec>() {
!matches!(
repart.output_partitioning(),
Partitioning::RoundRobinBatch(_)
)
} else if let Some(coalesce) = plan.as_any().downcast_ref::<CoalescePartitionsExec>()
{
coalesce.input().output_partitioning().partition_count() > 1
} else if let Some(sort_preserving_merge) =
plan.as_any().downcast_ref::<SortPreservingMergeExec>()
{
sort_preserving_merge
.input()
.output_partitioning()
.partition_count()
> 1
} else {
false
}
}
#[allow(clippy::vtable_address_comparisons)]
pub fn with_new_children_if_necessary(
plan: Arc<dyn ExecutionPlan>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let old_children = plan.children();
if children.len() != old_children.len() {
Err(DataFusionError::Internal(
"Wrong number of children".to_string(),
))
} else if children.is_empty()
|| children
.iter()
.zip(old_children.iter())
.any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
{
plan.with_new_children(children)
} else {
Ok(plan)
}
}
pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
DisplayableExecutionPlan::new(plan)
}
pub fn accept<V: ExecutionPlanVisitor>(
plan: &dyn ExecutionPlan,
visitor: &mut V,
) -> Result<(), V::Error> {
visitor.pre_visit(plan)?;
for child in plan.children() {
visit_execution_plan(child.as_ref(), visitor)?;
}
visitor.post_visit(plan)?;
Ok(())
}
pub trait ExecutionPlanVisitor {
type Error;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error>;
fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
Ok(true)
}
}
pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
plan: &dyn ExecutionPlan,
visitor: &mut V,
) -> Result<(), V::Error> {
visitor.pre_visit(plan)?;
for child in plan.children() {
visit_execution_plan(child.as_ref(), visitor)?;
}
visitor.post_visit(plan)?;
Ok(())
}
pub async fn collect(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<Vec<RecordBatch>> {
let stream = execute_stream(plan, context)?;
common::collect(stream).await
}
pub fn execute_stream(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => plan.execute(0, context),
_ => {
let plan = CoalescePartitionsExec::new(plan.clone());
assert_eq!(1, plan.output_partitioning().partition_count());
plan.execute(0, context)
}
}
}
pub async fn collect_partitioned(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<Vec<Vec<RecordBatch>>> {
let streams = execute_stream_partitioned(plan, context)?;
let mut batches = Vec::with_capacity(streams.len());
for stream in streams {
batches.push(common::collect(stream).await?);
}
Ok(batches)
}
pub fn execute_stream_partitioned(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<Vec<SendableRecordBatchStream>> {
let num_partitions = plan.output_partitioning().partition_count();
let mut streams = Vec::with_capacity(num_partitions);
for i in 0..num_partitions {
streams.push(plan.execute(i, context.clone())?);
}
Ok(streams)
}
#[derive(Debug, Clone)]
pub enum Partitioning {
RoundRobinBatch(usize),
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
UnknownPartitioning(usize),
}
impl Partitioning {
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
}
}
pub fn satisfy<F: FnOnce() -> EquivalenceProperties>(
&self,
required: Distribution,
equal_properties: F,
) -> bool {
match required {
Distribution::UnspecifiedDistribution => true,
Distribution::SinglePartition if self.partition_count() == 1 => true,
Distribution::HashPartitioned(required_exprs) => {
match self {
Partitioning::Hash(partition_exprs, _) => {
let fast_match =
expr_list_eq_strict_order(&required_exprs, partition_exprs);
if !fast_match {
let eq_properties = equal_properties();
let eq_classes = eq_properties.classes();
if !eq_classes.is_empty() {
let normalized_required_exprs = required_exprs
.iter()
.map(|e| {
normalize_expr_with_equivalence_properties(
e.clone(),
eq_classes,
)
})
.collect::<Vec<_>>();
let normalized_partition_exprs = partition_exprs
.iter()
.map(|e| {
normalize_expr_with_equivalence_properties(
e.clone(),
eq_classes,
)
})
.collect::<Vec<_>>();
expr_list_eq_strict_order(
&normalized_required_exprs,
&normalized_partition_exprs,
)
} else {
fast_match
}
} else {
fast_match
}
}
_ => false,
}
}
_ => false,
}
}
}
impl PartialEq for Partitioning {
fn eq(&self, other: &Partitioning) -> bool {
match (self, other) {
(
Partitioning::RoundRobinBatch(count1),
Partitioning::RoundRobinBatch(count2),
) if count1 == count2 => true,
(Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
if expr_list_eq_strict_order(exprs1, exprs2) && (count1 == count2) =>
{
true
}
_ => false,
}
}
}
#[derive(Debug, Clone)]
pub enum Distribution {
UnspecifiedDistribution,
SinglePartition,
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}
impl Distribution {
pub fn create_partitioning(&self, partition_count: usize) -> Partitioning {
match self {
Distribution::UnspecifiedDistribution => {
Partitioning::UnknownPartitioning(partition_count)
}
Distribution::SinglePartition => Partitioning::UnknownPartitioning(1),
Distribution::HashPartitioned(expr) => {
Partitioning::Hash(expr.clone(), partition_count)
}
}
}
}
use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
};
pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
pub fn project_schema(
schema: &SchemaRef,
projection: Option<&Vec<usize>>,
) -> Result<SchemaRef> {
let schema = match projection {
Some(columns) => Arc::new(schema.project(columns)?),
None => Arc::clone(schema),
};
Ok(schema)
}
pub mod aggregates;
pub mod analyze;
pub mod coalesce_batches;
pub mod coalesce_partitions;
pub mod common;
pub mod display;
pub mod empty;
pub mod explain;
pub mod file_format;
pub mod filter;
pub mod joins;
pub mod limit;
pub mod memory;
pub mod metrics;
pub mod planner;
pub mod projection;
pub mod repartition;
pub mod rewrite;
pub mod sorts;
pub mod stream;
pub mod streaming;
pub mod udaf;
pub mod union;
pub mod values;
pub mod windows;
use crate::execution::context::TaskContext;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
pub use datafusion_physical_expr::{
expressions, functions, hash_utils, type_coercion, udf,
};
#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::DataType;
use arrow::datatypes::Schema;
use crate::physical_plan::Distribution;
use crate::physical_plan::Partitioning;
use crate::physical_plan::PhysicalExpr;
use datafusion_physical_expr::expressions::Column;
use std::sync::Arc;
#[tokio::test]
async fn partitioning_satisfy_distribution() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
arrow::datatypes::Field::new("column_1", DataType::Int64, false),
arrow::datatypes::Field::new("column_2", DataType::Utf8, false),
]));
let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
];
let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
];
let distribution_types = vec![
Distribution::UnspecifiedDistribution,
Distribution::SinglePartition,
Distribution::HashPartitioned(partition_exprs1.clone()),
];
let single_partition = Partitioning::UnknownPartitioning(1);
let unspecified_partition = Partitioning::UnknownPartitioning(10);
let round_robin_partition = Partitioning::RoundRobinBatch(10);
let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);
for distribution in distribution_types {
let result = (
single_partition.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
unspecified_partition.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
round_robin_partition.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
hash_partition1.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
hash_partition2.satisfy(distribution.clone(), || {
EquivalenceProperties::new(schema.clone())
}),
);
match distribution {
Distribution::UnspecifiedDistribution => {
assert_eq!(result, (true, true, true, true, true))
}
Distribution::SinglePartition => {
assert_eq!(result, (true, false, false, false, false))
}
Distribution::HashPartitioned(_) => {
assert_eq!(result, (false, false, false, true, false))
}
}
}
Ok(())
}
}