use sqlparser::ast::{self, Select};
use crate::error::{Result, SqlError};
use crate::functions::registry::FunctionRegistry;
use crate::parser::normalize::normalize_ident;
use crate::resolver::columns::TableScope;
use crate::resolver::expr::convert_expr;
use crate::types::*;
pub fn plan_join_from_select(
select: &Select,
scope: &TableScope,
catalog: &dyn SqlCatalog,
functions: &FunctionRegistry,
) -> Result<Option<SqlPlan>> {
let from = &select.from[0];
let left_table = scope
.tables
.values()
.find(|t| {
let rel_name =
crate::parser::normalize::table_name_from_factor(&from.relation).map(|(n, _)| n);
rel_name.as_deref() == Some(&t.name) || rel_name.as_deref() == t.alias.as_deref()
})
.ok_or_else(|| SqlError::Unsupported {
detail: "cannot resolve left table in JOIN".into(),
})?;
let left_plan = SqlPlan::Scan {
collection: left_table.name.clone(),
alias: left_table.alias.clone(),
engine: left_table.info.engine,
filters: Vec::new(),
projection: Vec::new(),
sort_keys: Vec::new(),
limit: None,
offset: 0,
distinct: false,
window_functions: Vec::new(),
};
let mut current_plan = left_plan;
for join_item in &from.joins {
let (right_name, right_alias) = crate::parser::normalize::table_name_from_factor(
&join_item.relation,
)
.ok_or_else(|| SqlError::Unsupported {
detail: "non-table JOIN target".into(),
})?;
let right_table = scope
.tables
.values()
.find(|t| t.name == right_name)
.ok_or_else(|| SqlError::UnknownTable {
name: right_name.clone(),
})?;
let right_plan = SqlPlan::Scan {
collection: right_table.name.clone(),
alias: right_alias,
engine: right_table.info.engine,
filters: Vec::new(),
projection: Vec::new(),
sort_keys: Vec::new(),
limit: None,
offset: 0,
distinct: false,
window_functions: Vec::new(),
};
let (join_type, on_keys, condition) = extract_join_spec(&join_item.join_operator)?;
current_plan = SqlPlan::Join {
left: Box::new(current_plan),
right: Box::new(right_plan),
on: on_keys,
join_type,
condition,
limit: 10000,
projection: Vec::new(),
filters: Vec::new(),
};
}
let (subquery_joins, effective_where) = if let Some(expr) = &select.selection {
let extraction = super::subquery::extract_subqueries(expr, catalog, functions)?;
(extraction.joins, extraction.remaining_where)
} else {
(Vec::new(), None)
};
let projection = super::select::convert_projection(&select.projection)?;
let filters = match &effective_where {
Some(expr) => super::select::convert_where_to_filters(expr)?,
None => Vec::new(),
};
for sq in subquery_joins {
current_plan = SqlPlan::Join {
left: Box::new(current_plan),
right: Box::new(sq.inner_plan),
on: vec![(sq.outer_column, sq.inner_column)],
join_type: sq.join_type,
condition: None,
limit: 10000,
projection: Vec::new(),
filters: Vec::new(),
};
}
let group_by_non_empty = match &select.group_by {
ast::GroupByExpr::All(_) => true,
ast::GroupByExpr::Expressions(exprs, _) => !exprs.is_empty(),
};
if super::select::convert_projection(&select.projection).is_ok() && group_by_non_empty {
let aggregates =
super::aggregate::extract_aggregates_from_projection(&select.projection, functions)?;
let group_by = super::aggregate::convert_group_by(&select.group_by)?;
let having = match &select.having {
Some(expr) => super::select::convert_where_to_filters(expr)?,
None => Vec::new(),
};
return Ok(Some(SqlPlan::Aggregate {
input: Box::new(current_plan),
group_by,
aggregates,
having,
limit: 10000,
}));
}
if let SqlPlan::Join {
projection: ref mut proj,
filters: ref mut filt,
..
} = current_plan
{
*proj = projection;
*filt = filters;
}
Ok(Some(current_plan))
}
type JoinSpec = (JoinType, Vec<(String, String)>, Option<SqlExpr>);
fn extract_join_spec(op: &ast::JoinOperator) -> Result<JoinSpec> {
match op {
ast::JoinOperator::Inner(constraint) | ast::JoinOperator::Join(constraint) => {
let (keys, cond) = extract_join_constraint(constraint)?;
Ok((JoinType::Inner, keys, cond))
}
ast::JoinOperator::Left(constraint) | ast::JoinOperator::LeftOuter(constraint) => {
let (keys, cond) = extract_join_constraint(constraint)?;
Ok((JoinType::Left, keys, cond))
}
ast::JoinOperator::Right(constraint) | ast::JoinOperator::RightOuter(constraint) => {
let (keys, cond) = extract_join_constraint(constraint)?;
Ok((JoinType::Right, keys, cond))
}
ast::JoinOperator::FullOuter(constraint) => {
let (keys, cond) = extract_join_constraint(constraint)?;
Ok((JoinType::Full, keys, cond))
}
ast::JoinOperator::CrossJoin(constraint) => {
let (keys, cond) = extract_join_constraint(constraint)?;
Ok((JoinType::Cross, keys, cond))
}
ast::JoinOperator::Semi(constraint) | ast::JoinOperator::LeftSemi(constraint) => {
let (keys, cond) = extract_join_constraint(constraint)?;
Ok((JoinType::Semi, keys, cond))
}
ast::JoinOperator::Anti(constraint) | ast::JoinOperator::LeftAnti(constraint) => {
let (keys, cond) = extract_join_constraint(constraint)?;
Ok((JoinType::Anti, keys, cond))
}
_ => Err(SqlError::Unsupported {
detail: format!("join type: {op:?}"),
}),
}
}
type JoinConstraintResult = (Vec<(String, String)>, Option<SqlExpr>);
fn extract_join_constraint(constraint: &ast::JoinConstraint) -> Result<JoinConstraintResult> {
match constraint {
ast::JoinConstraint::On(expr) => {
let mut keys = Vec::new();
let mut non_equi = Vec::new();
extract_equi_keys(expr, &mut keys, &mut non_equi)?;
let cond = if non_equi.is_empty() {
None
} else {
Some(convert_expr(non_equi.first().unwrap())?)
};
Ok((keys, cond))
}
ast::JoinConstraint::Using(columns) => {
let keys = columns
.iter()
.map(|c| {
let name = crate::parser::normalize::normalize_object_name(c);
(name.clone(), name)
})
.collect();
Ok((keys, None))
}
ast::JoinConstraint::Natural => Ok((Vec::new(), None)),
ast::JoinConstraint::None => Ok((Vec::new(), None)),
}
}
fn extract_equi_keys(
expr: &ast::Expr,
keys: &mut Vec<(String, String)>,
non_equi: &mut Vec<ast::Expr>,
) -> Result<()> {
match expr {
ast::Expr::BinaryOp {
left,
op: ast::BinaryOperator::And,
right,
} => {
extract_equi_keys(left, keys, non_equi)?;
extract_equi_keys(right, keys, non_equi)?;
}
ast::Expr::BinaryOp {
left,
op: ast::BinaryOperator::Eq,
right,
} => {
if let (Some(l), Some(r)) = (extract_col_ref(left), extract_col_ref(right)) {
keys.push((l, r));
} else {
non_equi.push(expr.clone());
}
}
_ => {
non_equi.push(expr.clone());
}
}
Ok(())
}
fn extract_col_ref(expr: &ast::Expr) -> Option<String> {
match expr {
ast::Expr::Identifier(ident) => Some(normalize_ident(ident)),
ast::Expr::CompoundIdentifier(parts) if !parts.is_empty() => Some(
parts
.iter()
.map(normalize_ident)
.collect::<Vec<_>>()
.join("."),
),
_ => None,
}
}