use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{DFSchema, Diagnostic, Result, Span, Spans, plan_err};
use datafusion_expr::expr::{Exists, InSubquery, SetComparison, SetQuantifier};
use datafusion_expr::{Expr, LogicalPlan, Subquery};
use sqlparser::ast::Expr as SQLExpr;
use sqlparser::ast::{BinaryOperator, Query, SelectItem, SetExpr};
use std::sync::Arc;
impl<S: ContextProvider> SqlToRel<'_, S> {
pub(super) fn parse_exists_subquery(
&self,
subquery: Query,
negated: bool,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
planner_context.append_outer_query_schema(input_schema.clone().into());
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.pop_outer_query_schema();
Ok(Expr::Exists(Exists {
subquery: Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
spans: Spans::new(),
},
negated,
}))
}
pub(super) fn parse_in_subquery(
&self,
expr: SQLExpr,
subquery: Query,
negated: bool,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));
let mut spans = Spans::new();
if let SetExpr::Select(select) = &subquery.body.as_ref() {
for item in &select.projection {
if let SelectItem::UnnamedExpr(SQLExpr::Identifier(ident)) = item
&& let Some(span) = Span::try_from_sqlparser_span(ident.span)
{
spans.add_span(span);
}
}
}
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.pop_outer_query_schema();
self.validate_single_column(
&sub_plan,
&spans,
"Too many columns! The subquery should only return one column",
"Select only one column in the subquery",
)?;
let expr_obj = self.sql_to_expr(expr, input_schema, planner_context)?;
Ok(Expr::InSubquery(InSubquery::new(
Box::new(expr_obj),
Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
spans,
},
negated,
)))
}
pub(super) fn parse_scalar_subquery(
&self,
subquery: Query,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));
let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
for item in &select.projection {
if let SelectItem::ExprWithAlias { alias, .. } = item
&& let Some(span) = Span::try_from_sqlparser_span(alias.span)
{
spans.add_span(span);
}
}
}
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.pop_outer_query_schema();
self.validate_single_column(
&sub_plan,
&spans,
"Too many columns! The subquery should only return one column",
"Select only one column in the subquery",
)?;
Ok(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
spans,
}))
}
fn validate_single_column(
&self,
sub_plan: &LogicalPlan,
spans: &Spans,
error_message: &str,
help_message: &str,
) -> Result<()> {
if sub_plan.schema().fields().len() > 1 {
let sub_schema = sub_plan.schema();
let field_names = sub_schema.field_names();
let diagnostic =
self.build_multi_column_diagnostic(spans, error_message, help_message);
plan_err!("{}: {}", error_message, field_names.join(", "); diagnostic=diagnostic)
} else {
Ok(())
}
}
fn build_multi_column_diagnostic(
&self,
spans: &Spans,
error_message: &str,
help_message: &str,
) -> Diagnostic {
let full_span = Span::union_iter(spans.0.iter().cloned());
let mut diagnostic = Diagnostic::new_error(error_message, full_span);
for (i, span) in spans.iter().skip(1).enumerate() {
diagnostic.add_note(format!("Extra column {}", i + 1), Some(*span));
}
diagnostic.add_help(help_message, None);
diagnostic
}
pub(super) fn parse_set_comparison_subquery(
&self,
left_expr: SQLExpr,
subquery: Query,
compare_op: &BinaryOperator,
quantifier: SetQuantifier,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
planner_context.append_outer_query_schema(Arc::new(input_schema.clone()));
let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
for item in &select.projection {
if let SelectItem::ExprWithAlias { alias, .. } = item
&& let Some(span) = Span::try_from_sqlparser_span(alias.span)
{
spans.add_span(span);
}
}
}
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.pop_outer_query_schema();
self.validate_single_column(
&sub_plan,
&spans,
"Too many columns! The subquery should only return one column",
"Select only one column in the subquery",
)?;
let expr_obj = self.sql_to_expr(left_expr, input_schema, planner_context)?;
Ok(Expr::SetComparison(SetComparison::new(
Box::new(expr_obj),
Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
spans,
},
self.parse_sql_binary_op(compare_op)?,
quantifier,
)))
}
}