use crate::expr::BinaryExpr;
use crate::logical_plan::builder::validate_unique_names;
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
use crate::utils::{
exprlist_to_fields, grouping_set_expr_count, grouping_set_to_exprlist,
};
use crate::{Expr, ExprSchemable, TableProviderFilterPushDown, TableSource};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{plan_err, Column, DFSchema, DFSchemaRef, DataFusionError};
use std::collections::HashSet;
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
#[derive(Clone)]
pub enum LogicalPlan {
Projection(Projection),
Filter(Filter),
Window(Window),
Aggregate(Aggregate),
Sort(Sort),
Join(Join),
CrossJoin(CrossJoin),
Repartition(Repartition),
Union(Union),
TableScan(TableScan),
EmptyRelation(EmptyRelation),
Subquery(Subquery),
SubqueryAlias(SubqueryAlias),
Limit(Limit),
CreateExternalTable(CreateExternalTable),
CreateMemoryTable(CreateMemoryTable),
CreateView(CreateView),
CreateCatalogSchema(CreateCatalogSchema),
CreateCatalog(CreateCatalog),
DropTable(DropTable),
DropView(DropView),
Values(Values),
Explain(Explain),
Analyze(Analyze),
Extension(Extension),
Distinct(Distinct),
SetVariable(SetVariable),
}
impl LogicalPlan {
pub fn schema(&self) -> &DFSchemaRef {
match self {
LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
LogicalPlan::Values(Values { schema, .. }) => schema,
LogicalPlan::TableScan(TableScan {
projected_schema, ..
}) => projected_schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
LogicalPlan::Distinct(Distinct { input }) => input.schema(),
LogicalPlan::Window(Window { schema, .. }) => schema,
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
LogicalPlan::Join(Join { schema, .. }) => schema,
LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
schema
}
LogicalPlan::Explain(explain) => &explain.schema,
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
LogicalPlan::Union(Union { schema, .. }) => schema,
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. }) => input.schema(),
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
schema
}
LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema,
LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
LogicalPlan::DropView(DropView { schema, .. }) => schema,
LogicalPlan::SetVariable(SetVariable { schema, .. }) => schema,
}
}
pub fn all_schemas(&self) -> Vec<&DFSchemaRef> {
match self {
LogicalPlan::TableScan(TableScan {
projected_schema, ..
}) => vec![projected_schema],
LogicalPlan::Values(Values { schema, .. }) => vec![schema],
LogicalPlan::Window(Window { input, schema, .. })
| LogicalPlan::Projection(Projection { input, schema, .. })
| LogicalPlan::Aggregate(Aggregate { input, schema, .. }) => {
let mut schemas = input.all_schemas();
schemas.insert(0, schema);
schemas
}
LogicalPlan::Join(Join {
left,
right,
schema,
..
})
| LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema,
}) => {
let mut schemas = left.all_schemas();
schemas.extend(right.all_schemas());
schemas.insert(0, schema);
schemas
}
LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.all_schemas(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => {
vec![schema]
}
LogicalPlan::Union(Union { schema, .. }) => {
vec![schema]
}
LogicalPlan::Extension(extension) => vec![extension.node.schema()],
LogicalPlan::Explain(Explain { schema, .. })
| LogicalPlan::Analyze(Analyze { schema, .. })
| LogicalPlan::EmptyRelation(EmptyRelation { schema, .. })
| LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. })
| LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. })
| LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => {
vec![schema]
}
LogicalPlan::Limit(Limit { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. })
| LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
LogicalPlan::Distinct(Distinct { input, .. }) => input.all_schemas(),
LogicalPlan::DropTable(_)
| LogicalPlan::DropView(_)
| LogicalPlan::SetVariable(_) => vec![],
}
}
pub fn explain_schema() -> SchemaRef {
SchemaRef::new(Schema::new(vec![
Field::new("plan_type", DataType::Utf8, false),
Field::new("plan", DataType::Utf8, false),
]))
}
pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
match self {
LogicalPlan::Projection(Projection { expr, .. }) => expr.clone(),
LogicalPlan::Values(Values { values, .. }) => {
values.iter().flatten().cloned().collect()
}
LogicalPlan::Filter(Filter { predicate, .. }) => vec![predicate.clone()],
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
}) => match partitioning_scheme {
Partitioning::Hash(expr, _) => expr.clone(),
Partitioning::DistributeBy(expr) => expr.clone(),
Partitioning::RoundRobinBatch(_) => vec![],
},
LogicalPlan::Window(Window { window_expr, .. }) => window_expr.clone(),
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
..
}) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
LogicalPlan::Join(Join { on, filter, .. }) => on
.iter()
.flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())])
.chain(
filter
.as_ref()
.map(|expr| vec![expr.clone()])
.unwrap_or_default(),
)
.collect(),
LogicalPlan::Sort(Sort { expr, .. }) => expr.clone(),
LogicalPlan::Extension(extension) => extension.node.expressions(),
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateView(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::DropView(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Union(_)
| LogicalPlan::Distinct(_) => {
vec![]
}
}
}
pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> {
match self {
LogicalPlan::Projection(Projection { input, .. }) => vec![input],
LogicalPlan::Filter(Filter { input, .. }) => vec![input],
LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
LogicalPlan::Window(Window { input, .. }) => vec![input],
LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
LogicalPlan::Sort(Sort { input, .. }) => vec![input],
LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
LogicalPlan::Extension(extension) => extension.node.inputs(),
LogicalPlan::Union(Union { inputs, .. }) => {
inputs.iter().map(|arc| arc.as_ref()).collect()
}
LogicalPlan::Distinct(Distinct { input }) => vec![input],
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. }) => {
vec![input]
}
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::DropView(_) => vec![],
}
}
pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
struct UsingJoinColumnVisitor {
using_columns: Vec<HashSet<Column>>,
}
impl PlanVisitor for UsingJoinColumnVisitor {
type Error = DataFusionError;
fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
if let LogicalPlan::Join(Join {
join_constraint: JoinConstraint::Using,
on,
..
}) = plan
{
self.using_columns.push(
on.iter()
.flat_map(|entry| [&entry.0, &entry.1])
.cloned()
.collect::<HashSet<Column>>(),
);
}
Ok(true)
}
}
let mut visitor = UsingJoinColumnVisitor {
using_columns: vec![],
};
self.accept(&mut visitor)?;
Ok(visitor.using_columns)
}
}
pub trait PlanVisitor {
type Error;
fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error>;
fn post_visit(&mut self, _plan: &LogicalPlan) -> Result<bool, Self::Error> {
Ok(true)
}
}
impl LogicalPlan {
pub fn accept<V>(&self, visitor: &mut V) -> Result<bool, V::Error>
where
V: PlanVisitor,
{
if !visitor.pre_visit(self)? {
return Ok(false);
}
let recurse = match self {
LogicalPlan::Projection(Projection { .. }) => {
self.visit_all_inputs(visitor)?
}
LogicalPlan::Filter(Filter { .. }) => self.visit_all_inputs(visitor)?,
LogicalPlan::Repartition(Repartition { input, .. }) => {
input.accept(visitor)?
}
LogicalPlan::Window(Window { input, .. }) => input.accept(visitor)?,
LogicalPlan::Aggregate(Aggregate { input, .. }) => input.accept(visitor)?,
LogicalPlan::Sort(Sort { input, .. }) => input.accept(visitor)?,
LogicalPlan::Join(Join { left, right, .. })
| LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
left.accept(visitor)? && right.accept(visitor)?
}
LogicalPlan::Union(Union { inputs, .. }) => {
for input in inputs {
if !input.accept(visitor)? {
return Ok(false);
}
}
true
}
LogicalPlan::Distinct(Distinct { input }) => input.accept(visitor)?,
LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
LogicalPlan::Subquery(Subquery { subquery, .. }) => {
subquery.accept(visitor)?
}
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
input.accept(visitor)?
}
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. }) => {
input.accept(visitor)?
}
LogicalPlan::Extension(extension) => {
for input in extension.node.inputs() {
if !input.accept(visitor)? {
return Ok(false);
}
}
true
}
LogicalPlan::Explain(explain) => explain.plan.accept(visitor)?,
LogicalPlan::Analyze(analyze) => analyze.input.accept(visitor)?,
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Values(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::DropView(_) => true,
};
if !recurse {
return Ok(false);
}
if !visitor.post_visit(self)? {
return Ok(false);
}
Ok(true)
}
pub fn visit_all_inputs<V>(&self, visitor: &mut V) -> Result<bool, V::Error>
where
V: PlanVisitor,
{
for input in self.all_inputs() {
if !input.accept(visitor)? {
return Ok(false);
}
}
Ok(true)
}
fn all_inputs(&self) -> Vec<Arc<LogicalPlan>> {
let mut inputs = vec![];
for expr in self.expressions() {
Self::collect_subqueries(&expr, &mut inputs);
}
for input in self.inputs() {
inputs.push(Arc::new(input.clone()));
}
inputs
}
fn collect_subqueries(expr: &Expr, sub: &mut Vec<Arc<LogicalPlan>>) {
match expr {
Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
Self::collect_subqueries(left, sub);
Self::collect_subqueries(right, sub);
}
Expr::Exists { subquery, .. } => {
sub.push(Arc::new(LogicalPlan::Subquery(subquery.clone())));
}
Expr::InSubquery { subquery, .. } => {
sub.push(Arc::new(LogicalPlan::Subquery(subquery.clone())));
}
Expr::ScalarSubquery(subquery) => {
sub.push(Arc::new(LogicalPlan::Subquery(subquery.clone())));
}
_ => {}
}
}
}
impl LogicalPlan {
pub fn display_indent(&self) -> impl Display + '_ {
struct Wrapper<'a>(&'a LogicalPlan);
impl<'a> Display for Wrapper<'a> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let with_schema = false;
let mut visitor = IndentVisitor::new(f, with_schema);
match self.0.accept(&mut visitor) {
Ok(_) => Ok(()),
Err(_) => Err(fmt::Error),
}
}
}
Wrapper(self)
}
pub fn display_indent_schema(&self) -> impl Display + '_ {
struct Wrapper<'a>(&'a LogicalPlan);
impl<'a> Display for Wrapper<'a> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let with_schema = true;
let mut visitor = IndentVisitor::new(f, with_schema);
match self.0.accept(&mut visitor) {
Ok(_) => Ok(()),
Err(_) => Err(fmt::Error),
}
}
}
Wrapper(self)
}
pub fn display_graphviz(&self) -> impl Display + '_ {
struct Wrapper<'a>(&'a LogicalPlan);
impl<'a> Display for Wrapper<'a> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
writeln!(
f,
"// Begin DataFusion GraphViz Plan (see https://graphviz.org)"
)?;
writeln!(f, "digraph {{")?;
let mut visitor = GraphvizVisitor::new(f);
visitor.pre_visit_plan("LogicalPlan")?;
self.0.accept(&mut visitor).map_err(|_| fmt::Error)?;
visitor.post_visit_plan()?;
visitor.set_with_schema(true);
visitor.pre_visit_plan("Detailed LogicalPlan")?;
self.0.accept(&mut visitor).map_err(|_| fmt::Error)?;
visitor.post_visit_plan()?;
writeln!(f, "}}")?;
writeln!(f, "// End DataFusion GraphViz Plan")?;
Ok(())
}
}
Wrapper(self)
}
pub fn display(&self) -> impl Display + '_ {
struct Wrapper<'a>(&'a LogicalPlan);
impl<'a> Display for Wrapper<'a> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self.0 {
LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
LogicalPlan::Values(Values { ref values, .. }) => {
let str_values: Vec<_> = values
.iter()
.take(5)
.map(|row| {
let item = row
.iter()
.map(|expr| expr.to_string())
.collect::<Vec<_>>()
.join(", ");
format!("({})", item)
})
.collect();
let elipse = if values.len() > 5 { "..." } else { "" };
write!(f, "Values: {}{}", str_values.join(", "), elipse)
}
LogicalPlan::TableScan(TableScan {
ref source,
ref table_name,
ref projection,
ref filters,
ref fetch,
..
}) => {
let projected_fields = match projection {
Some(indices) => {
let schema = source.schema();
let names: Vec<&str> = indices
.iter()
.map(|i| schema.field(*i).name().as_str())
.collect();
format!(" projection=[{}]", names.join(", "))
}
_ => "".to_string(),
};
write!(f, "TableScan: {}{}", table_name, projected_fields)?;
if !filters.is_empty() {
let mut full_filter = vec![];
let mut partial_filter = vec![];
let mut unsupported_filters = vec![];
filters.iter().for_each(|x| {
if let Ok(t) = source.supports_filter_pushdown(x) {
match t {
TableProviderFilterPushDown::Exact => {
full_filter.push(x)
}
TableProviderFilterPushDown::Inexact => {
partial_filter.push(x)
}
TableProviderFilterPushDown::Unsupported => {
unsupported_filters.push(x)
}
}
}
});
if !full_filter.is_empty() {
write!(f, ", full_filters={:?}", full_filter)?;
};
if !partial_filter.is_empty() {
write!(f, ", partial_filters={:?}", partial_filter)?;
}
if !unsupported_filters.is_empty() {
write!(
f,
", unsupported_filters={:?}",
unsupported_filters
)?;
}
}
if let Some(n) = fetch {
write!(f, ", fetch={}", n)?;
}
Ok(())
}
LogicalPlan::Projection(Projection {
ref expr, alias, ..
}) => {
write!(f, "Projection: ")?;
for (i, expr_item) in expr.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{:?}", expr_item)?;
}
if let Some(a) = alias {
write!(f, ", alias={}", a)?;
}
Ok(())
}
LogicalPlan::Filter(Filter {
predicate: ref expr,
..
}) => write!(f, "Filter: {:?}", expr),
LogicalPlan::Window(Window {
ref window_expr, ..
}) => {
write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
}
LogicalPlan::Aggregate(Aggregate {
ref group_expr,
ref aggr_expr,
..
}) => write!(
f,
"Aggregate: groupBy=[{:?}], aggr=[{:?}]",
group_expr, aggr_expr
),
LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
write!(f, "Sort: ")?;
for (i, expr_item) in expr.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{:?}", expr_item)?;
}
if let Some(a) = fetch {
write!(f, ", fetch={}", a)?;
}
Ok(())
}
LogicalPlan::Join(Join {
on: ref keys,
filter,
join_constraint,
join_type,
..
}) => {
let join_expr: Vec<String> =
keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect();
let filter_expr = filter
.as_ref()
.map(|expr| format!(" Filter: {}", expr))
.unwrap_or_else(|| "".to_string());
match join_constraint {
JoinConstraint::On => {
write!(
f,
"{} Join: {}{}",
join_type,
join_expr.join(", "),
filter_expr
)
}
JoinConstraint::Using => {
write!(
f,
"{} Join: Using {}{}",
join_type,
join_expr.join(", "),
filter_expr,
)
}
}
}
LogicalPlan::CrossJoin(_) => {
write!(f, "CrossJoin:")
}
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
}) => match partitioning_scheme {
Partitioning::RoundRobinBatch(n) => write!(
f,
"Repartition: RoundRobinBatch partition_count={}",
n
),
Partitioning::Hash(expr, n) => {
let hash_expr: Vec<String> =
expr.iter().map(|e| format!("{:?}", e)).collect();
write!(
f,
"Repartition: Hash({}) partition_count={}",
hash_expr.join(", "),
n
)
}
Partitioning::DistributeBy(expr) => {
let dist_by_expr: Vec<String> =
expr.iter().map(|e| format!("{:?}", e)).collect();
write!(
f,
"Repartition: DistributeBy({})",
dist_by_expr.join(", "),
)
}
},
LogicalPlan::Limit(Limit {
ref skip,
ref fetch,
..
}) => {
write!(
f,
"Limit: skip={}, fetch={}",
skip,
fetch.map_or_else(|| "None".to_string(), |x| x.to_string())
)
}
LogicalPlan::Subquery(Subquery { .. }) => {
write!(f, "Subquery:")
}
LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
write!(f, "SubqueryAlias: {}", alias)
}
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref name,
..
}) => {
write!(f, "CreateExternalTable: {:?}", name)
}
LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name, ..
}) => {
write!(f, "CreateMemoryTable: {:?}", name)
}
LogicalPlan::CreateView(CreateView { name, .. }) => {
write!(f, "CreateView: {:?}", name)
}
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
schema_name,
..
}) => {
write!(f, "CreateCatalogSchema: {:?}", schema_name)
}
LogicalPlan::CreateCatalog(CreateCatalog {
catalog_name, ..
}) => {
write!(f, "CreateCatalog: {:?}", catalog_name)
}
LogicalPlan::DropTable(DropTable {
name, if_exists, ..
}) => {
write!(f, "DropTable: {:?} if not exist:={}", name, if_exists)
}
LogicalPlan::DropView(DropView {
name, if_exists, ..
}) => {
write!(f, "DropView: {:?} if not exist:={}", name, if_exists)
}
LogicalPlan::SetVariable(SetVariable {
variable, value, ..
}) => {
write!(f, "SetVariable: set {:?} to {:?}", variable, value)
}
LogicalPlan::Distinct(Distinct { .. }) => {
write!(f, "Distinct:")
}
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union(_) => write!(f, "Union"),
LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
}
}
}
Wrapper(self)
}
}
impl Debug for LogicalPlan {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
self.display_indent().fmt(f)
}
}
impl ToStringifiedPlan for LogicalPlan {
fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
StringifiedPlan::new(plan_type, self.display_indent().to_string())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum JoinType {
Inner,
Left,
Right,
Full,
LeftSemi,
RightSemi,
LeftAnti,
RightAnti,
}
impl Display for JoinType {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let join_type = match self {
JoinType::Inner => "Inner",
JoinType::Left => "Left",
JoinType::Right => "Right",
JoinType::Full => "Full",
JoinType::LeftSemi => "LeftSemi",
JoinType::RightSemi => "RightSemi",
JoinType::LeftAnti => "LeftAnti",
JoinType::RightAnti => "RightAnti",
};
write!(f, "{}", join_type)
}
}
#[derive(Debug, Clone, Copy)]
pub enum JoinConstraint {
On,
Using,
}
#[derive(Clone)]
pub struct CreateCatalog {
pub catalog_name: String,
pub if_not_exists: bool,
pub schema: DFSchemaRef,
}
#[derive(Clone)]
pub struct CreateCatalogSchema {
pub schema_name: String,
pub if_not_exists: bool,
pub schema: DFSchemaRef,
}
#[derive(Clone)]
pub struct DropTable {
pub name: String,
pub if_exists: bool,
pub schema: DFSchemaRef,
}
#[derive(Clone)]
pub struct DropView {
pub name: String,
pub if_exists: bool,
pub schema: DFSchemaRef,
}
#[derive(Clone)]
pub struct SetVariable {
pub variable: String,
pub value: String,
pub schema: DFSchemaRef,
}
#[derive(Clone)]
pub struct EmptyRelation {
pub produce_one_row: bool,
pub schema: DFSchemaRef,
}
#[derive(Clone)]
pub struct Values {
pub schema: DFSchemaRef,
pub values: Vec<Vec<Expr>>,
}
#[derive(Clone)]
pub struct Projection {
pub expr: Vec<Expr>,
pub input: Arc<LogicalPlan>,
pub schema: DFSchemaRef,
pub alias: Option<String>,
}
impl Projection {
pub fn try_new(
expr: Vec<Expr>,
input: Arc<LogicalPlan>,
alias: Option<String>,
) -> Result<Self, DataFusionError> {
let schema = Arc::new(DFSchema::new_with_metadata(
exprlist_to_fields(&expr, &input)?,
input.schema().metadata().clone(),
)?);
Self::try_new_with_schema(expr, input, schema, alias)
}
pub fn try_new_with_schema(
expr: Vec<Expr>,
input: Arc<LogicalPlan>,
schema: DFSchemaRef,
alias: Option<String>,
) -> Result<Self, DataFusionError> {
if expr.len() != schema.fields().len() {
return Err(DataFusionError::Plan(format!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len())));
}
Ok(Self {
expr,
input,
schema,
alias,
})
}
pub fn try_from_plan(plan: &LogicalPlan) -> datafusion_common::Result<&Projection> {
match plan {
LogicalPlan::Projection(it) => Ok(it),
_ => plan_err!("Could not coerce into Projection!"),
}
}
}
#[derive(Clone)]
pub struct SubqueryAlias {
pub input: Arc<LogicalPlan>,
pub alias: String,
pub schema: DFSchemaRef,
}
#[derive(Clone)]
pub struct Filter {
predicate: Expr,
input: Arc<LogicalPlan>,
}
impl Filter {
pub fn try_new(
predicate: Expr,
input: Arc<LogicalPlan>,
) -> datafusion_common::Result<Self> {
if let Ok(predicate_type) = predicate.get_type(input.schema()) {
if predicate_type != DataType::Boolean {
return Err(DataFusionError::Plan(format!(
"Cannot create filter with non-boolean predicate '{}' returning {}",
predicate, predicate_type
)));
}
}
if let Expr::Alias(expr, alias) = predicate {
return Err(DataFusionError::Plan(format!(
"Attempted to create Filter predicate with \
expression `{}` aliased as '{}'. Filter predicates should not be \
aliased.",
expr, alias
)));
}
Ok(Self { predicate, input })
}
pub fn try_from_plan(plan: &LogicalPlan) -> datafusion_common::Result<&Filter> {
match plan {
LogicalPlan::Filter(it) => Ok(it),
_ => plan_err!("Could not coerce into Filter!"),
}
}
pub fn predicate(&self) -> &Expr {
&self.predicate
}
pub fn input(&self) -> &Arc<LogicalPlan> {
&self.input
}
}
#[derive(Clone)]
pub struct Window {
pub input: Arc<LogicalPlan>,
pub window_expr: Vec<Expr>,
pub schema: DFSchemaRef,
}
#[derive(Clone)]
pub struct TableScan {
pub table_name: String,
pub source: Arc<dyn TableSource>,
pub projection: Option<Vec<usize>>,
pub projected_schema: DFSchemaRef,
pub filters: Vec<Expr>,
pub fetch: Option<usize>,
}
#[derive(Clone)]
pub struct CrossJoin {
pub left: Arc<LogicalPlan>,
pub right: Arc<LogicalPlan>,
pub schema: DFSchemaRef,
}
#[derive(Clone)]
pub struct Repartition {
pub input: Arc<LogicalPlan>,
pub partitioning_scheme: Partitioning,
}
#[derive(Clone)]
pub struct Union {
pub inputs: Vec<Arc<LogicalPlan>>,
pub schema: DFSchemaRef,
pub alias: Option<String>,
}
#[derive(Clone)]
pub struct CreateMemoryTable {
pub name: String,
pub input: Arc<LogicalPlan>,
pub if_not_exists: bool,
pub or_replace: bool,
}
#[derive(Clone)]
pub struct CreateView {
pub name: String,
pub input: Arc<LogicalPlan>,
pub or_replace: bool,
pub definition: Option<String>,
}
#[derive(Clone)]
pub struct CreateExternalTable {
pub schema: DFSchemaRef,
pub name: String,
pub location: String,
pub file_type: String,
pub has_header: bool,
pub delimiter: char,
pub table_partition_cols: Vec<String>,
pub if_not_exists: bool,
pub definition: Option<String>,
pub file_compression_type: String,
}
#[derive(Clone)]
pub struct Explain {
pub verbose: bool,
pub plan: Arc<LogicalPlan>,
pub stringified_plans: Vec<StringifiedPlan>,
pub schema: DFSchemaRef,
}
#[derive(Clone)]
pub struct Analyze {
pub verbose: bool,
pub input: Arc<LogicalPlan>,
pub schema: DFSchemaRef,
}
#[derive(Clone)]
pub struct Extension {
pub node: Arc<dyn UserDefinedLogicalNode>,
}
#[derive(Clone)]
pub struct Limit {
pub skip: usize,
pub fetch: Option<usize>,
pub input: Arc<LogicalPlan>,
}
#[derive(Clone)]
pub struct Distinct {
pub input: Arc<LogicalPlan>,
}
#[derive(Clone)]
pub struct Aggregate {
pub input: Arc<LogicalPlan>,
pub group_expr: Vec<Expr>,
pub aggr_expr: Vec<Expr>,
pub schema: DFSchemaRef,
}
impl Aggregate {
pub fn try_new(
input: Arc<LogicalPlan>,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> datafusion_common::Result<Self> {
let grouping_expr: Vec<Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
let all_expr = grouping_expr.iter().chain(aggr_expr.iter());
validate_unique_names("Aggregations", all_expr.clone())?;
let schema = DFSchema::new_with_metadata(
exprlist_to_fields(all_expr, &input)?,
input.schema().metadata().clone(),
)?;
Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
}
pub fn try_new_with_schema(
input: Arc<LogicalPlan>,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
schema: DFSchemaRef,
) -> datafusion_common::Result<Self> {
if group_expr.is_empty() && aggr_expr.is_empty() {
return Err(DataFusionError::Plan(
"Aggregate requires at least one grouping or aggregate expression"
.to_string(),
));
}
let group_expr_count = grouping_set_expr_count(&group_expr)?;
if schema.fields().len() != group_expr_count + aggr_expr.len() {
return Err(DataFusionError::Plan(format!(
"Aggregate schema has wrong number of fields. Expected {} got {}",
group_expr_count + aggr_expr.len(),
schema.fields().len()
)));
}
Ok(Self {
input,
group_expr,
aggr_expr,
schema,
})
}
pub fn try_from_plan(plan: &LogicalPlan) -> datafusion_common::Result<&Aggregate> {
match plan {
LogicalPlan::Aggregate(it) => Ok(it),
_ => plan_err!("Could not coerce into Aggregate!"),
}
}
}
#[derive(Clone)]
pub struct Sort {
pub expr: Vec<Expr>,
pub input: Arc<LogicalPlan>,
pub fetch: Option<usize>,
}
#[derive(Clone)]
pub struct Join {
pub left: Arc<LogicalPlan>,
pub right: Arc<LogicalPlan>,
pub on: Vec<(Column, Column)>,
pub filter: Option<Expr>,
pub join_type: JoinType,
pub join_constraint: JoinConstraint,
pub schema: DFSchemaRef,
pub null_equals_null: bool,
}
#[derive(Clone)]
pub struct Subquery {
pub subquery: Arc<LogicalPlan>,
}
impl Subquery {
pub fn new(plan: LogicalPlan) -> Self {
Subquery {
subquery: Arc::new(plan),
}
}
pub fn try_from_expr(plan: &Expr) -> datafusion_common::Result<&Subquery> {
match plan {
Expr::ScalarSubquery(it) => Ok(it),
Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
_ => plan_err!("Could not coerce into ScalarSubquery!"),
}
}
}
impl Debug for Subquery {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "<subquery>")
}
}
impl Hash for Subquery {
fn hash<H: Hasher>(&self, state: &mut H) {
state.finish();
}
fn hash_slice<H: Hasher>(_data: &[Self], state: &mut H)
where
Self: Sized,
{
state.finish();
}
}
impl PartialEq for Subquery {
fn eq(&self, _other: &Self) -> bool {
false
}
}
impl Eq for Subquery {}
#[derive(Debug, Clone)]
pub enum Partitioning {
RoundRobinBatch(usize),
Hash(Vec<Expr>, usize),
DistributeBy(Vec<Expr>),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PlanType {
InitialLogicalPlan,
OptimizedLogicalPlan {
optimizer_name: String,
},
FinalLogicalPlan,
InitialPhysicalPlan,
OptimizedPhysicalPlan {
optimizer_name: String,
},
FinalPhysicalPlan,
}
impl Display for PlanType {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
PlanType::InitialLogicalPlan => write!(f, "initial_logical_plan"),
PlanType::OptimizedLogicalPlan { optimizer_name } => {
write!(f, "logical_plan after {}", optimizer_name)
}
PlanType::FinalLogicalPlan => write!(f, "logical_plan"),
PlanType::InitialPhysicalPlan => write!(f, "initial_physical_plan"),
PlanType::OptimizedPhysicalPlan { optimizer_name } => {
write!(f, "physical_plan after {}", optimizer_name)
}
PlanType::FinalPhysicalPlan => write!(f, "physical_plan"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(clippy::rc_buffer)]
pub struct StringifiedPlan {
pub plan_type: PlanType,
pub plan: Arc<String>,
}
impl StringifiedPlan {
pub fn new(plan_type: PlanType, plan: impl Into<String>) -> Self {
StringifiedPlan {
plan_type,
plan: Arc::new(plan.into()),
}
}
pub fn should_display(&self, verbose_mode: bool) -> bool {
match self.plan_type {
PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
_ => verbose_mode,
}
}
}
pub trait ToStringifiedPlan {
fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::logical_plan::table_scan;
use crate::{col, in_subquery, lit};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::DFSchema;
use datafusion_common::Result;
use std::collections::HashMap;
fn employee_schema() -> Schema {
Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("first_name", DataType::Utf8, false),
Field::new("last_name", DataType::Utf8, false),
Field::new("state", DataType::Utf8, false),
Field::new("salary", DataType::Int32, false),
])
}
fn display_plan() -> Result<LogicalPlan> {
let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
.build()?;
table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
.filter(in_subquery(col("state"), Arc::new(plan1)))?
.project(vec![col("id")])?
.build()
}
#[test]
fn test_display_indent() -> Result<()> {
let plan = display_plan()?;
let expected = "Projection: employee_csv.id\
\n Filter: employee_csv.state IN (<subquery>)\
\n Subquery:\
\n TableScan: employee_csv projection=[state]\
\n TableScan: employee_csv projection=[id, state]";
assert_eq!(expected, format!("{}", plan.display_indent()));
Ok(())
}
#[test]
fn test_display_indent_schema() -> Result<()> {
let plan = display_plan()?;
let expected = "Projection: employee_csv.id [id:Int32]\
\n Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]\
\n Subquery: [state:Utf8]\
\n TableScan: employee_csv projection=[state] [state:Utf8]\
\n TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]";
assert_eq!(expected, format!("{}", plan.display_indent_schema()));
Ok(())
}
#[test]
fn test_display_graphviz() -> Result<()> {
let plan = display_plan()?;
let graphviz = format!("{}", plan.display_graphviz());
assert!(
graphviz.contains(
r#"// Begin DataFusion GraphViz Plan (see https://graphviz.org)"#
),
"\n{}",
plan.display_graphviz()
);
assert!(
graphviz.contains(
r#"[shape=box label="TableScan: employee_csv projection=[id, state]"]"#
),
"\n{}",
plan.display_graphviz()
);
assert!(graphviz.contains(r#"[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]"#),
"\n{}", plan.display_graphviz());
assert!(
graphviz.contains(r#"// End DataFusion GraphViz Plan"#),
"\n{}",
plan.display_graphviz()
);
Ok(())
}
#[derive(Debug, Default)]
struct OkVisitor {
strings: Vec<String>,
}
impl PlanVisitor for OkVisitor {
type Error = String;
fn pre_visit(
&mut self,
plan: &LogicalPlan,
) -> std::result::Result<bool, Self::Error> {
let s = match plan {
LogicalPlan::Projection { .. } => "pre_visit Projection",
LogicalPlan::Filter { .. } => "pre_visit Filter",
LogicalPlan::TableScan { .. } => "pre_visit TableScan",
_ => unimplemented!("unknown plan type"),
};
self.strings.push(s.into());
Ok(true)
}
fn post_visit(
&mut self,
plan: &LogicalPlan,
) -> std::result::Result<bool, Self::Error> {
let s = match plan {
LogicalPlan::Projection { .. } => "post_visit Projection",
LogicalPlan::Filter { .. } => "post_visit Filter",
LogicalPlan::TableScan { .. } => "post_visit TableScan",
_ => unimplemented!("unknown plan type"),
};
self.strings.push(s.into());
Ok(true)
}
}
#[test]
fn visit_order() {
let mut visitor = OkVisitor::default();
let plan = test_plan();
let res = plan.accept(&mut visitor);
assert!(res.is_ok());
assert_eq!(
visitor.strings,
vec![
"pre_visit Projection",
"pre_visit Filter",
"pre_visit TableScan",
"post_visit TableScan",
"post_visit Filter",
"post_visit Projection",
]
);
}
#[derive(Debug, Default)]
struct OptionalCounter {
val: Option<usize>,
}
impl OptionalCounter {
fn new(val: usize) -> Self {
Self { val: Some(val) }
}
fn dec(&mut self) -> bool {
if Some(0) == self.val {
true
} else {
self.val = self.val.take().map(|i| i - 1);
false
}
}
}
#[derive(Debug, Default)]
struct StoppingVisitor {
inner: OkVisitor,
return_false_from_pre_in: OptionalCounter,
return_false_from_post_in: OptionalCounter,
}
impl PlanVisitor for StoppingVisitor {
type Error = String;
fn pre_visit(
&mut self,
plan: &LogicalPlan,
) -> std::result::Result<bool, Self::Error> {
if self.return_false_from_pre_in.dec() {
return Ok(false);
}
self.inner.pre_visit(plan)
}
fn post_visit(
&mut self,
plan: &LogicalPlan,
) -> std::result::Result<bool, Self::Error> {
if self.return_false_from_post_in.dec() {
return Ok(false);
}
self.inner.post_visit(plan)
}
}
#[test]
fn early_stopping_pre_visit() {
let mut visitor = StoppingVisitor {
return_false_from_pre_in: OptionalCounter::new(2),
..Default::default()
};
let plan = test_plan();
let res = plan.accept(&mut visitor);
assert!(res.is_ok());
assert_eq!(
visitor.inner.strings,
vec!["pre_visit Projection", "pre_visit Filter"]
);
}
#[test]
fn early_stopping_post_visit() {
let mut visitor = StoppingVisitor {
return_false_from_post_in: OptionalCounter::new(1),
..Default::default()
};
let plan = test_plan();
let res = plan.accept(&mut visitor);
assert!(res.is_ok());
assert_eq!(
visitor.inner.strings,
vec![
"pre_visit Projection",
"pre_visit Filter",
"pre_visit TableScan",
"post_visit TableScan",
]
);
}
#[derive(Debug, Default)]
struct ErrorVisitor {
inner: OkVisitor,
return_error_from_pre_in: OptionalCounter,
return_error_from_post_in: OptionalCounter,
}
impl PlanVisitor for ErrorVisitor {
type Error = String;
fn pre_visit(
&mut self,
plan: &LogicalPlan,
) -> std::result::Result<bool, Self::Error> {
if self.return_error_from_pre_in.dec() {
return Err("Error in pre_visit".into());
}
self.inner.pre_visit(plan)
}
fn post_visit(
&mut self,
plan: &LogicalPlan,
) -> std::result::Result<bool, Self::Error> {
if self.return_error_from_post_in.dec() {
return Err("Error in post_visit".into());
}
self.inner.post_visit(plan)
}
}
#[test]
fn error_pre_visit() {
let mut visitor = ErrorVisitor {
return_error_from_pre_in: OptionalCounter::new(2),
..Default::default()
};
let plan = test_plan();
let res = plan.accept(&mut visitor);
if let Err(e) = res {
assert_eq!("Error in pre_visit", e);
} else {
panic!("Expected an error");
}
assert_eq!(
visitor.inner.strings,
vec!["pre_visit Projection", "pre_visit Filter"]
);
}
#[test]
fn error_post_visit() {
let mut visitor = ErrorVisitor {
return_error_from_post_in: OptionalCounter::new(1),
..Default::default()
};
let plan = test_plan();
let res = plan.accept(&mut visitor);
if let Err(e) = res {
assert_eq!("Error in post_visit", e);
} else {
panic!("Expected an error");
}
assert_eq!(
visitor.inner.strings,
vec![
"pre_visit Projection",
"pre_visit Filter",
"pre_visit TableScan",
"post_visit TableScan",
]
);
}
#[test]
fn projection_expr_schema_mismatch() -> Result<()> {
let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new())?);
let p = Projection::try_new_with_schema(
vec![col("a")],
Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: empty_schema.clone(),
})),
empty_schema,
None,
);
assert_eq!("Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)", format!("{}", p.err().unwrap()));
Ok(())
}
fn test_plan() -> LogicalPlan {
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("state", DataType::Utf8, false),
]);
table_scan(None, &schema, Some(vec![0, 1]))
.unwrap()
.filter(col("state").eq(lit("CO")))
.unwrap()
.project(vec![col("id")])
.unwrap()
.build()
.unwrap()
}
}