use nodedb_types::DatabaseId;
use sqlparser::ast::{self, Select};
use super::helpers::{convert_projection, convert_where_to_filters, eval_constant_expr};
use super::where_search::try_extract_where_search;
use crate::error::{Result, SqlError};
use crate::functions::registry::FunctionRegistry;
use crate::parser::normalize::normalize_ident;
use crate::planner::lateral::plan::{
is_lateral_derived, lateral_alias_from_factor, plan_lateral_join, subquery_from_factor,
};
use crate::resolver::columns::TableScope;
use crate::temporal::TemporalScope;
use crate::types::*;
use super::entry::{CteCatalog, plan_query};
pub(super) fn plan_select(
select: &Select,
catalog: &dyn SqlCatalog,
functions: &FunctionRegistry,
temporal: TemporalScope,
) -> Result<SqlPlan> {
if let Some(plan) =
crate::planner::array_fn::try_plan_array_table_fn(&select.from, catalog, temporal)?
{
return Ok(plan);
}
if let Some(plan) = try_plan_derived_from(select, catalog, functions, temporal)? {
return Ok(plan);
}
let scope = TableScope::resolve_from(catalog, &select.from)?;
if select.from.is_empty() {
if let Some(plan) =
crate::planner::array_fn::try_plan_array_maint_fn(&select.projection, catalog)?
{
return Ok(plan);
}
let projection = convert_projection(&select.projection)?;
let mut columns = Vec::new();
let mut values = Vec::new();
for (i, proj) in projection.iter().enumerate() {
match proj {
Projection::Computed { expr, alias } => {
columns.push(alias.clone());
values.push(eval_constant_expr(expr, functions));
}
Projection::Column(name) => {
columns.push(name.clone());
values.push(SqlValue::Null);
}
_ => {
columns.push(format!("col{i}"));
values.push(SqlValue::Null);
}
}
}
return Ok(SqlPlan::ConstantResult { columns, values });
}
if let Some(plan) = try_plan_join(select, &scope, catalog, functions, temporal)? {
return Ok(plan);
}
if select.from.len() == 2 && is_lateral_derived(&select.from[1].relation) {
let outer_twj = &select.from[0];
let lateral_twj = &select.from[1];
let outer_alias = extract_table_alias_from_twj(outer_twj);
let outer_collection =
crate::parser::normalize::table_name_from_factor(&outer_twj.relation)?
.map(|(n, _)| n)
.ok_or_else(|| SqlError::Unsupported {
detail: "LATERAL: outer side must be a plain table".into(),
})?;
let outer_info = catalog
.get_collection(DatabaseId::DEFAULT, &outer_collection)?
.ok_or_else(|| SqlError::UnknownTable {
name: outer_collection.clone(),
})?;
let outer_scan = SqlPlan::Scan {
collection: outer_collection,
alias: outer_alias.clone(),
engine: outer_info.engine,
filters: Vec::new(),
projection: Vec::new(),
sort_keys: Vec::new(),
limit: None,
offset: 0,
distinct: false,
window_functions: Vec::new(),
temporal,
};
let lateral_alias = lateral_alias_from_factor(&lateral_twj.relation).ok_or_else(|| {
SqlError::Unsupported {
detail: "LATERAL subquery requires an alias (e.g. LATERAL (...) AS x)".into(),
}
})?;
let subquery = subquery_from_factor(&lateral_twj.relation)
.expect("is_lateral_derived guarantees Derived variant");
let projection = convert_projection(&select.projection)?;
return plan_lateral_join(
outer_scan,
outer_alias,
subquery,
&lateral_alias,
false, projection,
catalog,
functions,
temporal,
)
.map(Ok)?;
}
let table = scope.single_table().ok_or_else(|| SqlError::Unsupported {
detail: "multi-table FROM without JOIN".into(),
})?;
let (subquery_joins, effective_where) = if let Some(expr) = &select.selection {
let extraction =
crate::planner::subquery::extract_subqueries(expr, catalog, functions, temporal)?;
(extraction.joins, extraction.remaining_where)
} else {
(Vec::new(), None)
};
let filters = match &effective_where {
Some(expr) => {
if let Some(plan) = try_extract_where_search(expr, table, functions)? {
return Ok(plan);
}
convert_where_to_filters(expr)?
}
None => Vec::new(),
};
if has_aggregation(select, functions) {
let mut plan = crate::planner::aggregate::plan_aggregate(
select, table, &filters, &scope, functions, &temporal,
)?;
if let SqlPlan::Aggregate { input, .. } = &mut plan {
let mut base_input = std::mem::replace(
input,
Box::new(SqlPlan::ConstantResult {
columns: Vec::new(),
values: Vec::new(),
}),
);
for sq in subquery_joins
.iter()
.filter(|sq| sq.join_type != JoinType::Cross)
{
base_input = Box::new(SqlPlan::Join {
left: base_input,
right: Box::new(sq.inner_plan.clone()),
on: vec![(sq.outer_column.clone(), sq.inner_column.clone())],
join_type: sq.join_type,
condition: None,
limit: 10000,
projection: Vec::new(),
filters: Vec::new(),
});
}
*input = base_input;
}
for sq in subquery_joins
.into_iter()
.filter(|sq| sq.join_type == JoinType::Cross)
{
plan = SqlPlan::Join {
left: Box::new(plan),
right: Box::new(sq.inner_plan),
on: vec![(sq.outer_column, sq.inner_column)],
join_type: sq.join_type,
condition: None,
limit: 10000,
projection: Vec::new(),
filters: Vec::new(),
};
}
return Ok(plan);
}
let projection = convert_projection(&select.projection)?;
let window_functions = crate::planner::window::extract_window_functions(select, functions)?;
let scan_projection = if subquery_joins.is_empty() {
projection.clone()
} else {
Vec::new()
};
let rules = crate::engine_rules::resolve_engine_rules(table.info.engine);
let mut plan = rules.plan_scan(crate::engine_rules::ScanParams {
collection: table.name.clone(),
alias: table.alias.clone(),
filters,
projection: scan_projection,
sort_keys: Vec::new(),
limit: None,
offset: 0,
distinct: select.distinct.is_some(),
window_functions,
indexes: table.info.indexes.clone(),
temporal,
bitemporal: table.info.bitemporal,
})?;
for sq in subquery_joins {
let join_filters = if sq.join_type == JoinType::Cross {
if let SqlPlan::Scan {
ref mut filters, ..
} = plan
{
let mut moved = Vec::new();
filters.retain(|f| {
if has_column_ref_filter(&f.expr) {
moved.push(f.clone());
false
} else {
true
}
});
moved
} else {
Vec::new()
}
} else {
Vec::new()
};
plan = SqlPlan::Join {
left: Box::new(plan),
right: Box::new(sq.inner_plan),
on: vec![(sq.outer_column, sq.inner_column)],
join_type: sq.join_type,
condition: None,
limit: 10000,
projection: Vec::new(),
filters: join_filters,
};
}
if let SqlPlan::Join {
projection: ref mut join_projection,
..
} = plan
{
*join_projection = projection;
}
Ok(plan)
}
fn has_column_ref_filter(expr: &FilterExpr) -> bool {
match expr {
FilterExpr::Expr(sql_expr) => has_column_comparison(sql_expr),
FilterExpr::And(filters) => filters.iter().any(|f| has_column_ref_filter(&f.expr)),
FilterExpr::Or(filters) => filters.iter().any(|f| has_column_ref_filter(&f.expr)),
_ => false,
}
}
fn has_column_comparison(expr: &SqlExpr) -> bool {
match expr {
SqlExpr::BinaryOp { left, right, .. } => {
let left_is_col = matches!(left.as_ref(), SqlExpr::Column { .. });
let right_is_col = matches!(right.as_ref(), SqlExpr::Column { .. });
if left_is_col && right_is_col {
return true;
}
has_column_comparison(left) || has_column_comparison(right)
}
_ => false,
}
}
fn extract_table_alias_from_twj(twj: &sqlparser::ast::TableWithJoins) -> Option<String> {
match &twj.relation {
sqlparser::ast::TableFactor::Table { alias, name, .. } => alias
.as_ref()
.map(|a| crate::parser::normalize::normalize_ident(&a.name))
.or_else(|| crate::parser::normalize::normalize_object_name_checked(name).ok()),
_ => None,
}
}
fn has_aggregation(select: &Select, functions: &FunctionRegistry) -> bool {
let group_by_non_empty = match &select.group_by {
ast::GroupByExpr::All(_) => true,
ast::GroupByExpr::Expressions(exprs, _) => !exprs.is_empty(),
};
if group_by_non_empty {
return true;
}
for item in &select.projection {
if let ast::SelectItem::UnnamedExpr(expr) | ast::SelectItem::ExprWithAlias { expr, .. } =
item
&& crate::aggregate_walk::contains_aggregate(expr, functions)
{
return true;
}
}
false
}
fn try_plan_derived_from(
select: &Select,
catalog: &dyn SqlCatalog,
functions: &FunctionRegistry,
temporal: TemporalScope,
) -> Result<Option<SqlPlan>> {
if select.from.len() != 1 {
return Ok(None);
}
let from = &select.from[0];
if !from.joins.is_empty() {
return Ok(None);
}
let (subquery, alias_ident) = match &from.relation {
ast::TableFactor::Derived {
lateral: false,
subquery,
alias: Some(alias),
..
} => (subquery, alias),
_ => return Ok(None),
};
let alias_name = normalize_ident(&alias_ident.name);
let inner_plan = plan_query(subquery, catalog, functions, temporal)?;
let derived_catalog = CteCatalog {
inner: catalog,
cte_names: vec![alias_name.clone()],
};
let mut outer_select = select.clone();
outer_select.from[0].relation = ast::TableFactor::Table {
name: ast::ObjectName::from(vec![ast::Ident::new(alias_name.clone())]),
alias: None,
args: None,
with_hints: Vec::new(),
version: None,
with_ordinality: false,
partitions: Vec::new(),
json_path: None,
sample: None,
index_hints: Vec::new(),
};
let outer_plan = plan_select(&outer_select, &derived_catalog, functions, temporal)?;
Ok(Some(SqlPlan::Cte {
definitions: vec![(alias_name, inner_plan)],
outer: Box::new(outer_plan),
}))
}
fn try_plan_join(
select: &Select,
scope: &TableScope,
catalog: &dyn SqlCatalog,
functions: &FunctionRegistry,
temporal: TemporalScope,
) -> Result<Option<SqlPlan>> {
if select.from.len() != 1 {
return Ok(None);
}
let from = &select.from[0];
if from.joins.is_empty() {
return Ok(None);
}
crate::planner::join::plan_join_from_select(select, scope, catalog, functions, temporal)
}