use std::sync::Arc;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{
DFSchema, Diagnostic, Result, Span, Spans, TableReference, not_impl_err, plan_err,
};
use datafusion_expr::builder::subquery_alias;
use datafusion_expr::planner::{
PlannedRelation, RelationPlannerContext, RelationPlanning,
};
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, expr::Unnest};
use datafusion_expr::{Subquery, SubqueryAlias};
use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor};
mod join;
struct SqlToRelRelationContext<'a, 'b, S: ContextProvider> {
planner: &'a SqlToRel<'b, S>,
planner_context: &'a mut PlannerContext,
}
impl<'a, 'b, S: ContextProvider> RelationPlannerContext
for SqlToRelRelationContext<'a, 'b, S>
{
fn context_provider(&self) -> &dyn ContextProvider {
self.planner.context_provider
}
fn plan(&mut self, relation: TableFactor) -> Result<LogicalPlan> {
self.planner.create_relation(relation, self.planner_context)
}
fn sql_to_expr(
&mut self,
expr: sqlparser::ast::Expr,
schema: &DFSchema,
) -> Result<Expr> {
self.planner.sql_to_expr(expr, schema, self.planner_context)
}
fn sql_expr_to_logical_expr(
&mut self,
expr: sqlparser::ast::Expr,
schema: &DFSchema,
) -> Result<Expr> {
self.planner
.sql_expr_to_logical_expr(expr, schema, self.planner_context)
}
fn normalize_ident(&self, ident: sqlparser::ast::Ident) -> String {
self.planner.ident_normalizer.normalize(ident)
}
fn object_name_to_table_reference(
&self,
name: sqlparser::ast::ObjectName,
) -> Result<TableReference> {
self.planner.object_name_to_table_reference(name)
}
}
impl<S: ContextProvider> SqlToRel<'_, S> {
fn create_relation(
&self,
relation: TableFactor,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let planned_relation =
match self.create_extension_relation(relation, planner_context)? {
RelationPlanning::Planned(planned) => planned,
RelationPlanning::Original(original) => {
Box::new(self.create_default_relation(*original, planner_context)?)
}
};
let optimized_plan = optimize_subquery_sort(planned_relation.plan)?.data;
if let Some(alias) = planned_relation.alias {
self.apply_table_alias(optimized_plan, alias)
} else {
Ok(optimized_plan)
}
}
fn create_extension_relation(
&self,
relation: TableFactor,
planner_context: &mut PlannerContext,
) -> Result<RelationPlanning> {
let planners = self.context_provider.get_relation_planners();
if planners.is_empty() {
return Ok(RelationPlanning::Original(Box::new(relation)));
}
let mut current_relation = relation;
for planner in planners.iter() {
let mut context = SqlToRelRelationContext {
planner: self,
planner_context,
};
match planner.plan_relation(current_relation, &mut context)? {
RelationPlanning::Planned(planned) => {
return Ok(RelationPlanning::Planned(planned));
}
RelationPlanning::Original(original) => {
current_relation = *original;
}
}
}
Ok(RelationPlanning::Original(Box::new(current_relation)))
}
fn create_default_relation(
&self,
relation: TableFactor,
planner_context: &mut PlannerContext,
) -> Result<PlannedRelation> {
let relation_span = relation.span();
let (plan, alias) = match relation {
TableFactor::Table {
name, alias, args, ..
} => {
if let Some(func_args) = args {
let tbl_func_name =
name.0.first().unwrap().as_ident().unwrap().to_string();
let args = func_args
.args
.into_iter()
.flat_map(|arg| {
if let FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) = arg
{
self.sql_expr_to_logical_expr(
expr,
&DFSchema::empty(),
planner_context,
)
} else {
plan_err!("Unsupported function argument type: {}", arg)
}
})
.collect::<Vec<_>>();
let provider = self
.context_provider
.get_table_function_source(&tbl_func_name, args)?;
let plan = LogicalPlanBuilder::scan(
TableReference::Bare {
table: format!("{tbl_func_name}()").into(),
},
provider,
None,
)?
.build()?;
(plan, alias)
} else {
let table_ref = self.object_name_to_table_reference(name)?;
let table_name = table_ref.to_string();
let cte = planner_context.get_cte(&table_name);
(
match (
cte,
self.context_provider.get_table_source(table_ref.clone()),
) {
(Some(cte_plan), _) => Ok(cte_plan.clone()),
(_, Ok(provider)) => LogicalPlanBuilder::scan(
table_ref.clone(),
provider,
None,
)?
.build(),
(None, Err(e)) => {
let e = e.with_diagnostic(Diagnostic::new_error(
format!("table '{table_ref}' not found"),
Span::try_from_sqlparser_span(relation_span),
));
Err(e)
}
}?,
alias,
)
}
}
TableFactor::Derived {
subquery, alias, ..
} => {
let logical_plan = self.query_to_plan(*subquery, planner_context)?;
(logical_plan, alias)
}
TableFactor::NestedJoin {
table_with_joins,
alias,
} => (
self.plan_table_with_joins(*table_with_joins, planner_context)?,
alias,
),
TableFactor::UNNEST {
alias,
array_exprs,
with_offset: false,
with_offset_alias: None,
with_ordinality,
} => {
if with_ordinality {
return not_impl_err!("UNNEST with ordinality is not supported yet");
}
let schema = DFSchema::empty();
let input = LogicalPlanBuilder::empty(true).build()?;
let unnest_exprs = array_exprs
.into_iter()
.map(|sql_expr| {
let expr = self.sql_expr_to_logical_expr(
sql_expr,
&schema,
planner_context,
)?;
Self::check_unnest_arg(&expr, &schema)?;
Ok(Expr::Unnest(Unnest::new(expr)))
})
.collect::<Result<Vec<_>>>()?;
if unnest_exprs.is_empty() {
return plan_err!("UNNEST must have at least one argument");
}
let logical_plan = self.try_process_unnest(input, unnest_exprs)?;
(logical_plan, alias)
}
TableFactor::UNNEST { .. } => {
return not_impl_err!(
"UNNEST table factor with offset is not supported yet"
);
}
TableFactor::Function {
name, args, alias, ..
} => {
let tbl_func_ref = self.object_name_to_table_reference(name)?;
let schema = planner_context
.outer_queries_schemas()
.last()
.cloned()
.unwrap_or_else(|| Arc::new(DFSchema::empty()));
let func_args = args
.into_iter()
.map(|arg| match arg {
FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))
| FunctionArg::Named {
arg: FunctionArgExpr::Expr(expr),
..
} => {
self.sql_expr_to_logical_expr(expr, &schema, planner_context)
}
_ => plan_err!("Unsupported function argument: {arg:?}"),
})
.collect::<Result<Vec<Expr>>>()?;
let provider = self
.context_provider
.get_table_function_source(tbl_func_ref.table(), func_args)?;
let plan =
LogicalPlanBuilder::scan(tbl_func_ref.table(), provider, None)?
.build()?;
(plan, alias)
}
_ => {
return not_impl_err!(
"Unsupported ast node {relation:?} in create_relation"
);
}
};
Ok(PlannedRelation::new(plan, alias))
}
pub(crate) fn create_relation_subquery(
&self,
subquery: TableFactor,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let old_from_schema = planner_context
.set_outer_from_schema(None)
.unwrap_or_else(|| Arc::new(DFSchema::empty()));
let outer_query_schema = planner_context.pop_outer_query_schema();
let new_query_schema = match outer_query_schema {
Some(ref old_query_schema) => {
let mut new_query_schema = old_from_schema.as_ref().clone();
new_query_schema.merge(old_query_schema.as_ref());
Arc::new(new_query_schema)
}
None => Arc::clone(&old_from_schema),
};
planner_context.append_outer_query_schema(new_query_schema);
let plan = self.create_relation(subquery, planner_context)?;
let outer_ref_columns = plan.all_out_ref_exprs();
planner_context.pop_outer_query_schema();
if let Some(schema) = outer_query_schema {
planner_context.append_outer_query_schema(schema);
}
planner_context.set_outer_from_schema(Some(old_from_schema));
if outer_ref_columns.is_empty() {
return Ok(plan);
}
match plan {
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
subquery_alias(
LogicalPlan::Subquery(Subquery {
subquery: input,
outer_ref_columns,
spans: Spans::new(),
}),
alias,
)
}
plan => Ok(LogicalPlan::Subquery(Subquery {
subquery: Arc::new(plan),
outer_ref_columns,
spans: Spans::new(),
})),
}
}
}
fn optimize_subquery_sort(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
let mut has_limit = false;
plan.transform_down(|c| {
if let LogicalPlan::Limit(_) = c {
has_limit = true;
return Ok(Transformed::no(c));
}
match c {
LogicalPlan::Sort(s) => {
if !has_limit {
has_limit = false;
return Ok(Transformed::yes(s.input.as_ref().clone()));
}
Ok(Transformed::no(LogicalPlan::Sort(s)))
}
_ => Ok(Transformed::no(c)),
}
})
}