Skip to main content

nodedb_sql/planner/lateral/
plan.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! LATERAL join planning: classify correlation shape and emit the appropriate
4//! `SqlPlan::LateralTopK` or `SqlPlan::LateralLoop` variant.
5
6use sqlparser::ast;
7
8use super::correlation::analyse_lateral_where;
9use crate::coerce::expr_as_usize_literal;
10use crate::error::{Result, SqlError};
11use crate::functions::registry::FunctionRegistry;
12use crate::parser::normalize::normalize_ident;
13use crate::resolver::expr::convert_expr;
14use crate::temporal::TemporalScope;
15use crate::types::*;
16
17/// The maximum outer-row count allowed for `LateralLoop` queries.
18pub const LATERAL_LOOP_CAP: usize = 100_000;
19
20/// Plan a LATERAL subquery join.
21///
22/// Called when the right side of a JOIN (or a comma-separated FROM item) is a
23/// `TableFactor::Derived { lateral: true, .. }`.
24///
25/// `outer_plan` — plan for the driving (outer) side.
26/// `outer_alias` — alias or name of the outer table for correlation detection.
27/// `subquery` — the LATERAL inner subquery.
28/// `lateral_alias` — alias given to the LATERAL in the SQL (e.g. `x` in `LATERAL (...) x`).
29/// `left_join` — true when the enclosing join is LEFT JOIN LATERAL (outer rows
30///               preserved when inner produces no rows).
31/// `outer_projection` — SELECT list projection to apply after the lateral.
32#[allow(clippy::too_many_arguments)]
33pub fn plan_lateral_join(
34    outer_plan: SqlPlan,
35    outer_alias: Option<String>,
36    subquery: &ast::Query,
37    lateral_alias: &str,
38    left_join: bool,
39    outer_projection: Vec<Projection>,
40    catalog: &dyn SqlCatalog,
41    functions: &FunctionRegistry,
42    temporal: TemporalScope,
43) -> Result<SqlPlan> {
44    let select = match subquery.body.as_ref() {
45        sqlparser::ast::SetExpr::Select(s) => s,
46        _ => {
47            return Err(SqlError::Unsupported {
48                detail: "LATERAL subquery body must be a SELECT".into(),
49            });
50        }
51    };
52
53    let outer_alias_str = outer_alias.as_deref().unwrap_or("").to_string();
54
55    let analysis = analyse_lateral_where(subquery, &outer_alias_str);
56
57    // Determine if this is the equi-correlated + TopK shape:
58    //   - At least one equi-key correlation.
59    //   - A LIMIT k on the subquery.
60    //   - No non-equi correlations (those require LateralLoop).
61    let has_equi = !analysis.equi_keys.is_empty();
62    let inner_limit = limit_from_query(subquery);
63    let is_top_k = has_equi && inner_limit.is_some() && analysis.non_equi.is_empty();
64
65    if is_top_k {
66        plan_lateral_top_k(
67            outer_plan,
68            outer_alias,
69            select,
70            subquery,
71            analysis.equi_keys,
72            inner_limit.expect("checked above"),
73            lateral_alias,
74            left_join,
75            outer_projection,
76        )
77    } else if has_equi && analysis.non_equi.is_empty() {
78        // Equi-correlated, no LIMIT: rewrite as a regular hash join.
79        let inner_plan =
80            crate::planner::select::plan_query(subquery, catalog, functions, temporal)?;
81        let equi_on: Vec<(String, String)> = analysis
82            .equi_keys
83            .into_iter()
84            .map(|c| (c.outer_col, c.inner_col))
85            .collect();
86        Ok(SqlPlan::Join {
87            left: Box::new(outer_plan),
88            right: Box::new(inner_plan),
89            on: equi_on,
90            join_type: if left_join {
91                JoinType::Left
92            } else {
93                JoinType::Inner
94            },
95            condition: None,
96            limit: 10000,
97            projection: outer_projection,
98            filters: Vec::new(),
99        })
100    } else {
101        // General correlation — LateralLoop.
102        //
103        // Non-equi correlated predicates (e.g. `e.log_time > u.created_at`)
104        // are encoded verbatim in `inner_plan`'s filter list as `GtColumn`
105        // operators; the Data Plane executor binds the outer value at runtime
106        // via `bind_outer_values`. We do NOT duplicate them in
107        // `correlation_predicates` (which the executor applies as `Eq`),
108        // because that would add a contradictory equality filter.
109        let inner_plan =
110            crate::planner::select::plan_query(subquery, catalog, functions, temporal)?;
111        let correlation_predicates: Vec<(String, String)> = analysis
112            .equi_keys
113            .iter()
114            .map(|c| (c.inner_col.clone(), c.outer_col.clone()))
115            .collect();
116        Ok(SqlPlan::LateralLoop {
117            outer: Box::new(outer_plan),
118            outer_alias,
119            inner: Box::new(inner_plan),
120            correlation_predicates,
121            lateral_alias: lateral_alias.to_string(),
122            projection: outer_projection,
123            outer_row_cap: LATERAL_LOOP_CAP,
124            left_join,
125        })
126    }
127}
128
129/// Plan the `LateralTopK` variant: equi-correlated + ORDER BY + LIMIT k.
130#[allow(clippy::too_many_arguments)]
131fn plan_lateral_top_k(
132    outer_plan: SqlPlan,
133    outer_alias: Option<String>,
134    select: &sqlparser::ast::Select,
135    subquery: &ast::Query,
136    equi_keys: Vec<super::correlation::CorrelationEq>,
137    inner_limit: usize,
138    lateral_alias: &str,
139    left_join: bool,
140    outer_projection: Vec<Projection>,
141) -> Result<SqlPlan> {
142    // Build a bare inner Scan without correlation filters (those are injected
143    // at runtime per outer row).
144    let inner_collection = extract_inner_collection(select)?;
145    let inner_filters = inner_non_correlated_filters(select, outer_alias.as_deref().unwrap_or(""))?;
146
147    // Extract ORDER BY from the inner subquery.
148    // For LATERAL inner scans we only need simple column-expression sort keys;
149    // the full search-trigger machinery (vector/hybrid search) is not applicable
150    // here, so we convert expressions directly.
151    let inner_order_by = if let Some(order_by) = &subquery.order_by {
152        match &order_by.kind {
153            ast::OrderByKind::Expressions(exprs) => exprs
154                .iter()
155                .filter_map(|o| {
156                    convert_expr(&o.expr).ok().map(|expr| SortKey {
157                        expr,
158                        ascending: o.options.asc.unwrap_or(true),
159                        nulls_first: o.options.nulls_first.unwrap_or(false),
160                    })
161                })
162                .collect(),
163            ast::OrderByKind::All(_) => Vec::new(),
164        }
165    } else {
166        Vec::new()
167    };
168
169    let correlation_keys: Vec<(String, String)> = equi_keys
170        .into_iter()
171        .map(|c| (c.outer_col, c.inner_col))
172        .collect();
173
174    Ok(SqlPlan::LateralTopK {
175        outer: Box::new(outer_plan),
176        outer_alias,
177        inner_collection,
178        inner_filters,
179        inner_order_by,
180        inner_limit,
181        correlation_keys,
182        lateral_alias: lateral_alias.to_string(),
183        projection: outer_projection,
184        left_join,
185    })
186}
187
188/// Extract the collection name from a single-table inner SELECT.
189fn extract_inner_collection(select: &sqlparser::ast::Select) -> Result<String> {
190    let from = select.from.first().ok_or_else(|| SqlError::Unsupported {
191        detail: "LATERAL subquery must have a FROM clause".into(),
192    })?;
193    match &from.relation {
194        ast::TableFactor::Table { name, .. } => {
195            crate::parser::normalize::normalize_object_name_checked(name)
196        }
197        _ => Err(SqlError::Unsupported {
198            detail: "LATERAL LateralTopK subquery must reference a plain table".into(),
199        }),
200    }
201}
202
203/// Extract filters from the inner SELECT that do NOT reference the outer alias.
204fn inner_non_correlated_filters(
205    select: &sqlparser::ast::Select,
206    outer_alias: &str,
207) -> Result<Vec<Filter>> {
208    let Some(where_expr) = &select.selection else {
209        return Ok(Vec::new());
210    };
211    let remaining = strip_outer_refs(where_expr, outer_alias);
212    match remaining {
213        Some(expr) => crate::planner::select::convert_where_to_filters(&expr),
214        None => Ok(Vec::new()),
215    }
216}
217
218/// Remove all predicates referencing `outer_alias` from a WHERE expression.
219fn strip_outer_refs(expr: &ast::Expr, outer_alias: &str) -> Option<ast::Expr> {
220    match expr {
221        ast::Expr::BinaryOp {
222            left,
223            op: ast::BinaryOperator::And,
224            right,
225        } => {
226            let l = strip_outer_refs(left, outer_alias);
227            let r = strip_outer_refs(right, outer_alias);
228            match (l, r) {
229                (None, None) => None,
230                (Some(e), None) | (None, Some(e)) => Some(e),
231                (Some(l), Some(r)) => Some(ast::Expr::BinaryOp {
232                    left: Box::new(l),
233                    op: ast::BinaryOperator::And,
234                    right: Box::new(r),
235                }),
236            }
237        }
238        ast::Expr::BinaryOp { left, right, .. } => {
239            if refs_outer(left, outer_alias) || refs_outer(right, outer_alias) {
240                None
241            } else {
242                Some(expr.clone())
243            }
244        }
245        ast::Expr::Nested(inner) => strip_outer_refs(inner, outer_alias),
246        _ => Some(expr.clone()),
247    }
248}
249
250fn refs_outer(expr: &ast::Expr, outer_alias: &str) -> bool {
251    match expr {
252        ast::Expr::CompoundIdentifier(parts) if parts.len() == 2 => {
253            normalize_ident(&parts[0]).eq_ignore_ascii_case(outer_alias)
254        }
255        ast::Expr::BinaryOp { left, right, .. } => {
256            refs_outer(left, outer_alias) || refs_outer(right, outer_alias)
257        }
258        _ => false,
259    }
260}
261
262/// Extract the LIMIT value from a query.
263fn limit_from_query(query: &ast::Query) -> Option<usize> {
264    match &query.limit_clause {
265        Some(ast::LimitClause::LimitOffset { limit, .. }) => {
266            limit.as_ref().and_then(expr_as_usize_literal)
267        }
268        Some(ast::LimitClause::OffsetCommaLimit { limit, .. }) => {
269            Some(expr_as_usize_literal(limit).unwrap_or(0))
270        }
271        None => None,
272    }
273}
274
275/// Extract a LATERAL alias from a `TableFactor::Derived`.
276pub fn lateral_alias_from_factor(factor: &ast::TableFactor) -> Option<String> {
277    match factor {
278        ast::TableFactor::Derived { alias, .. } => alias.as_ref().map(|a| normalize_ident(&a.name)),
279        _ => None,
280    }
281}
282
283/// True when a `TableFactor` is a LATERAL derived subquery.
284pub fn is_lateral_derived(factor: &ast::TableFactor) -> bool {
285    matches!(factor, ast::TableFactor::Derived { lateral: true, .. })
286}
287
288/// Extract the subquery from a `TableFactor::Derived`.
289pub fn subquery_from_factor(factor: &ast::TableFactor) -> Option<&ast::Query> {
290    match factor {
291        ast::TableFactor::Derived { subquery, .. } => Some(subquery),
292        _ => None,
293    }
294}