Skip to main content

nodedb_sql/planner/
join.rs

1//! JOIN planning: extract left/right tables, equi-join keys, join type.
2
3use sqlparser::ast::{self, Select};
4
5use crate::error::{Result, SqlError};
6use crate::functions::registry::FunctionRegistry;
7use crate::parser::normalize::normalize_ident;
8use crate::resolver::columns::TableScope;
9use crate::resolver::expr::convert_expr;
10use crate::types::*;
11
12/// Plan a JOIN from a SELECT statement with JOINs in FROM clause.
13pub fn plan_join_from_select(
14    select: &Select,
15    scope: &TableScope,
16    catalog: &dyn SqlCatalog,
17    functions: &FunctionRegistry,
18) -> Result<Option<SqlPlan>> {
19    let from = &select.from[0];
20    let left_table = scope
21        .tables
22        .values()
23        .find(|t| {
24            let rel_name =
25                crate::parser::normalize::table_name_from_factor(&from.relation).map(|(n, _)| n);
26            rel_name.as_deref() == Some(&t.name) || rel_name.as_deref() == t.alias.as_deref()
27        })
28        .ok_or_else(|| SqlError::Unsupported {
29            detail: "cannot resolve left table in JOIN".into(),
30        })?;
31
32    // Build left scan.
33    let left_plan = SqlPlan::Scan {
34        collection: left_table.name.clone(),
35        alias: left_table.alias.clone(),
36        engine: left_table.info.engine,
37        filters: Vec::new(),
38        projection: Vec::new(),
39        sort_keys: Vec::new(),
40        limit: None,
41        offset: 0,
42        distinct: false,
43        window_functions: Vec::new(),
44    };
45
46    let mut current_plan = left_plan;
47
48    for join_item in &from.joins {
49        let (right_name, right_alias) = crate::parser::normalize::table_name_from_factor(
50            &join_item.relation,
51        )
52        .ok_or_else(|| SqlError::Unsupported {
53            detail: "non-table JOIN target".into(),
54        })?;
55        let right_table = scope
56            .tables
57            .values()
58            .find(|t| t.name == right_name)
59            .ok_or_else(|| SqlError::UnknownTable {
60                name: right_name.clone(),
61            })?;
62
63        let right_plan = SqlPlan::Scan {
64            collection: right_table.name.clone(),
65            alias: right_alias,
66            engine: right_table.info.engine,
67            filters: Vec::new(),
68            projection: Vec::new(),
69            sort_keys: Vec::new(),
70            limit: None,
71            offset: 0,
72            distinct: false,
73            window_functions: Vec::new(),
74        };
75
76        let (join_type, on_keys, condition) = extract_join_spec(&join_item.join_operator)?;
77
78        current_plan = SqlPlan::Join {
79            left: Box::new(current_plan),
80            right: Box::new(right_plan),
81            on: on_keys,
82            join_type,
83            condition,
84            limit: 10000,
85            projection: Vec::new(),
86            filters: Vec::new(),
87        };
88    }
89
90    // Extract subqueries from WHERE and wrap as additional joins.
91    let (subquery_joins, effective_where) = if let Some(expr) = &select.selection {
92        let extraction = super::subquery::extract_subqueries(expr, catalog, functions)?;
93        (extraction.joins, extraction.remaining_where)
94    } else {
95        (Vec::new(), None)
96    };
97
98    // Apply remaining WHERE as filters and SELECT projection on the join plan.
99    let projection = super::select::convert_projection(&select.projection)?;
100    let filters = match &effective_where {
101        Some(expr) => super::select::convert_where_to_filters(expr)?,
102        None => Vec::new(),
103    };
104
105    // Wrap with subquery joins (semi/anti/cross) if any.
106    for sq in subquery_joins {
107        current_plan = SqlPlan::Join {
108            left: Box::new(current_plan),
109            right: Box::new(sq.inner_plan),
110            on: vec![(sq.outer_column, sq.inner_column)],
111            join_type: sq.join_type,
112            condition: None,
113            limit: 10000,
114            projection: Vec::new(),
115            filters: Vec::new(),
116        };
117    }
118
119    // Check if there's aggregation on the join.
120    let group_by_non_empty = match &select.group_by {
121        ast::GroupByExpr::All(_) => true,
122        ast::GroupByExpr::Expressions(exprs, _) => !exprs.is_empty(),
123    };
124    if super::select::convert_projection(&select.projection).is_ok() && group_by_non_empty {
125        let aggregates =
126            super::aggregate::extract_aggregates_from_projection(&select.projection, functions)?;
127        let group_by = super::aggregate::convert_group_by(&select.group_by)?;
128        let having = match &select.having {
129            Some(expr) => super::select::convert_where_to_filters(expr)?,
130            None => Vec::new(),
131        };
132        return Ok(Some(SqlPlan::Aggregate {
133            input: Box::new(current_plan),
134            group_by,
135            aggregates,
136            having,
137            limit: 10000,
138        }));
139    }
140
141    // Attach SELECT projection and WHERE filters to the outermost join.
142    if let SqlPlan::Join {
143        projection: ref mut proj,
144        filters: ref mut filt,
145        ..
146    } = current_plan
147    {
148        *proj = projection;
149        *filt = filters;
150    }
151    Ok(Some(current_plan))
152}
153
154/// (join_type, equi_keys, non-equi condition)
155type JoinSpec = (JoinType, Vec<(String, String)>, Option<SqlExpr>);
156
157/// Extract join type, equi-join keys, and non-equi condition.
158fn extract_join_spec(op: &ast::JoinOperator) -> Result<JoinSpec> {
159    match op {
160        ast::JoinOperator::Inner(constraint) | ast::JoinOperator::Join(constraint) => {
161            let (keys, cond) = extract_join_constraint(constraint)?;
162            Ok((JoinType::Inner, keys, cond))
163        }
164        ast::JoinOperator::Left(constraint) | ast::JoinOperator::LeftOuter(constraint) => {
165            let (keys, cond) = extract_join_constraint(constraint)?;
166            Ok((JoinType::Left, keys, cond))
167        }
168        ast::JoinOperator::Right(constraint) | ast::JoinOperator::RightOuter(constraint) => {
169            let (keys, cond) = extract_join_constraint(constraint)?;
170            Ok((JoinType::Right, keys, cond))
171        }
172        ast::JoinOperator::FullOuter(constraint) => {
173            let (keys, cond) = extract_join_constraint(constraint)?;
174            Ok((JoinType::Full, keys, cond))
175        }
176        ast::JoinOperator::CrossJoin(constraint) => {
177            let (keys, cond) = extract_join_constraint(constraint)?;
178            Ok((JoinType::Cross, keys, cond))
179        }
180        ast::JoinOperator::Semi(constraint) | ast::JoinOperator::LeftSemi(constraint) => {
181            let (keys, cond) = extract_join_constraint(constraint)?;
182            Ok((JoinType::Semi, keys, cond))
183        }
184        ast::JoinOperator::Anti(constraint) | ast::JoinOperator::LeftAnti(constraint) => {
185            let (keys, cond) = extract_join_constraint(constraint)?;
186            Ok((JoinType::Anti, keys, cond))
187        }
188        _ => Err(SqlError::Unsupported {
189            detail: format!("join type: {op:?}"),
190        }),
191    }
192}
193
194/// (equi_keys, non-equi condition)
195type JoinConstraintResult = (Vec<(String, String)>, Option<SqlExpr>);
196
197/// Extract equi-join keys from ON clause.
198fn extract_join_constraint(constraint: &ast::JoinConstraint) -> Result<JoinConstraintResult> {
199    match constraint {
200        ast::JoinConstraint::On(expr) => {
201            let mut keys = Vec::new();
202            let mut non_equi = Vec::new();
203            extract_equi_keys(expr, &mut keys, &mut non_equi)?;
204            let cond = if non_equi.is_empty() {
205                None
206            } else {
207                // Fold ALL non-equi predicates with AND — not just the first.
208                let mut combined = convert_expr(&non_equi[0])?;
209                for pred in &non_equi[1..] {
210                    combined = SqlExpr::BinaryOp {
211                        left: Box::new(combined),
212                        op: crate::types::BinaryOp::And,
213                        right: Box::new(convert_expr(pred)?),
214                    };
215                }
216                Some(combined)
217            };
218            Ok((keys, cond))
219        }
220        ast::JoinConstraint::Using(columns) => {
221            let keys = columns
222                .iter()
223                .map(|c| {
224                    let name = crate::parser::normalize::normalize_object_name(c);
225                    (name.clone(), name)
226                })
227                .collect();
228            Ok((keys, None))
229        }
230        ast::JoinConstraint::Natural => Err(crate::error::SqlError::Unsupported {
231            detail: "NATURAL JOIN is not supported; use explicit ON or USING clause".into(),
232        }),
233        ast::JoinConstraint::None => Err(crate::error::SqlError::Unsupported {
234            detail: "implicit cross join (no ON/USING clause) is not supported".into(),
235        }),
236    }
237}
238
239/// Recursively extract equality join keys from an ON expression.
240fn extract_equi_keys(
241    expr: &ast::Expr,
242    keys: &mut Vec<(String, String)>,
243    non_equi: &mut Vec<ast::Expr>,
244) -> Result<()> {
245    match expr {
246        ast::Expr::BinaryOp {
247            left,
248            op: ast::BinaryOperator::And,
249            right,
250        } => {
251            extract_equi_keys(left, keys, non_equi)?;
252            extract_equi_keys(right, keys, non_equi)?;
253        }
254        ast::Expr::BinaryOp {
255            left,
256            op: ast::BinaryOperator::Eq,
257            right,
258        } => {
259            if let (Some(l), Some(r)) = (extract_col_ref(left), extract_col_ref(right)) {
260                keys.push((l, r));
261            } else {
262                non_equi.push(expr.clone());
263            }
264        }
265        _ => {
266            non_equi.push(expr.clone());
267        }
268    }
269    Ok(())
270}
271
272/// Extract a column reference (possibly qualified) from an expression.
273fn extract_col_ref(expr: &ast::Expr) -> Option<String> {
274    match expr {
275        ast::Expr::Identifier(ident) => Some(normalize_ident(ident)),
276        ast::Expr::CompoundIdentifier(parts) if !parts.is_empty() => Some(
277            parts
278                .iter()
279                .map(normalize_ident)
280                .collect::<Vec<_>>()
281                .join("."),
282        ),
283        _ => None,
284    }
285}