nodedb_sql/planner/lateral/
plan.rs1use sqlparser::ast;
7
8use super::correlation::analyse_lateral_where;
9use crate::coerce::expr_as_usize_literal;
10use crate::error::{Result, SqlError};
11use crate::functions::registry::FunctionRegistry;
12use crate::parser::normalize::normalize_ident;
13use crate::resolver::expr::convert_expr;
14use crate::temporal::TemporalScope;
15use crate::types::*;
16
17pub const LATERAL_LOOP_CAP: usize = 100_000;
19
20#[allow(clippy::too_many_arguments)]
33pub fn plan_lateral_join(
34 outer_plan: SqlPlan,
35 outer_alias: Option<String>,
36 subquery: &ast::Query,
37 lateral_alias: &str,
38 left_join: bool,
39 outer_projection: Vec<Projection>,
40 catalog: &dyn SqlCatalog,
41 functions: &FunctionRegistry,
42 temporal: TemporalScope,
43) -> Result<SqlPlan> {
44 let select = match subquery.body.as_ref() {
45 sqlparser::ast::SetExpr::Select(s) => s,
46 _ => {
47 return Err(SqlError::Unsupported {
48 detail: "LATERAL subquery body must be a SELECT".into(),
49 });
50 }
51 };
52
53 let outer_alias_str = outer_alias.as_deref().unwrap_or("").to_string();
54
55 let analysis = analyse_lateral_where(subquery, &outer_alias_str);
56
57 let has_equi = !analysis.equi_keys.is_empty();
62 let inner_limit = limit_from_query(subquery);
63 let is_top_k = has_equi && inner_limit.is_some() && analysis.non_equi.is_empty();
64
65 if is_top_k {
66 plan_lateral_top_k(
67 outer_plan,
68 outer_alias,
69 select,
70 subquery,
71 analysis.equi_keys,
72 inner_limit.expect("checked above"),
73 lateral_alias,
74 left_join,
75 outer_projection,
76 )
77 } else if has_equi && analysis.non_equi.is_empty() {
78 let inner_plan =
80 crate::planner::select::plan_query(subquery, catalog, functions, temporal)?;
81 let equi_on: Vec<(String, String)> = analysis
82 .equi_keys
83 .into_iter()
84 .map(|c| (c.outer_col, c.inner_col))
85 .collect();
86 Ok(SqlPlan::Join {
87 left: Box::new(outer_plan),
88 right: Box::new(inner_plan),
89 on: equi_on,
90 join_type: if left_join {
91 JoinType::Left
92 } else {
93 JoinType::Inner
94 },
95 condition: None,
96 limit: 10000,
97 projection: outer_projection,
98 filters: Vec::new(),
99 })
100 } else {
101 let inner_plan =
110 crate::planner::select::plan_query(subquery, catalog, functions, temporal)?;
111 let correlation_predicates: Vec<(String, String)> = analysis
112 .equi_keys
113 .iter()
114 .map(|c| (c.inner_col.clone(), c.outer_col.clone()))
115 .collect();
116 Ok(SqlPlan::LateralLoop {
117 outer: Box::new(outer_plan),
118 outer_alias,
119 inner: Box::new(inner_plan),
120 correlation_predicates,
121 lateral_alias: lateral_alias.to_string(),
122 projection: outer_projection,
123 outer_row_cap: LATERAL_LOOP_CAP,
124 left_join,
125 })
126 }
127}
128
129#[allow(clippy::too_many_arguments)]
131fn plan_lateral_top_k(
132 outer_plan: SqlPlan,
133 outer_alias: Option<String>,
134 select: &sqlparser::ast::Select,
135 subquery: &ast::Query,
136 equi_keys: Vec<super::correlation::CorrelationEq>,
137 inner_limit: usize,
138 lateral_alias: &str,
139 left_join: bool,
140 outer_projection: Vec<Projection>,
141) -> Result<SqlPlan> {
142 let inner_collection = extract_inner_collection(select)?;
145 let inner_filters = inner_non_correlated_filters(select, outer_alias.as_deref().unwrap_or(""))?;
146
147 let inner_order_by = if let Some(order_by) = &subquery.order_by {
152 match &order_by.kind {
153 ast::OrderByKind::Expressions(exprs) => exprs
154 .iter()
155 .filter_map(|o| {
156 convert_expr(&o.expr).ok().map(|expr| SortKey {
157 expr,
158 ascending: o.options.asc.unwrap_or(true),
159 nulls_first: o.options.nulls_first.unwrap_or(false),
160 })
161 })
162 .collect(),
163 ast::OrderByKind::All(_) => Vec::new(),
164 }
165 } else {
166 Vec::new()
167 };
168
169 let correlation_keys: Vec<(String, String)> = equi_keys
170 .into_iter()
171 .map(|c| (c.outer_col, c.inner_col))
172 .collect();
173
174 Ok(SqlPlan::LateralTopK {
175 outer: Box::new(outer_plan),
176 outer_alias,
177 inner_collection,
178 inner_filters,
179 inner_order_by,
180 inner_limit,
181 correlation_keys,
182 lateral_alias: lateral_alias.to_string(),
183 projection: outer_projection,
184 left_join,
185 })
186}
187
188fn extract_inner_collection(select: &sqlparser::ast::Select) -> Result<String> {
190 let from = select.from.first().ok_or_else(|| SqlError::Unsupported {
191 detail: "LATERAL subquery must have a FROM clause".into(),
192 })?;
193 match &from.relation {
194 ast::TableFactor::Table { name, .. } => {
195 crate::parser::normalize::normalize_object_name_checked(name)
196 }
197 _ => Err(SqlError::Unsupported {
198 detail: "LATERAL LateralTopK subquery must reference a plain table".into(),
199 }),
200 }
201}
202
203fn inner_non_correlated_filters(
205 select: &sqlparser::ast::Select,
206 outer_alias: &str,
207) -> Result<Vec<Filter>> {
208 let Some(where_expr) = &select.selection else {
209 return Ok(Vec::new());
210 };
211 let remaining = strip_outer_refs(where_expr, outer_alias);
212 match remaining {
213 Some(expr) => crate::planner::select::convert_where_to_filters(&expr),
214 None => Ok(Vec::new()),
215 }
216}
217
218fn strip_outer_refs(expr: &ast::Expr, outer_alias: &str) -> Option<ast::Expr> {
220 match expr {
221 ast::Expr::BinaryOp {
222 left,
223 op: ast::BinaryOperator::And,
224 right,
225 } => {
226 let l = strip_outer_refs(left, outer_alias);
227 let r = strip_outer_refs(right, outer_alias);
228 match (l, r) {
229 (None, None) => None,
230 (Some(e), None) | (None, Some(e)) => Some(e),
231 (Some(l), Some(r)) => Some(ast::Expr::BinaryOp {
232 left: Box::new(l),
233 op: ast::BinaryOperator::And,
234 right: Box::new(r),
235 }),
236 }
237 }
238 ast::Expr::BinaryOp { left, right, .. } => {
239 if refs_outer(left, outer_alias) || refs_outer(right, outer_alias) {
240 None
241 } else {
242 Some(expr.clone())
243 }
244 }
245 ast::Expr::Nested(inner) => strip_outer_refs(inner, outer_alias),
246 _ => Some(expr.clone()),
247 }
248}
249
250fn refs_outer(expr: &ast::Expr, outer_alias: &str) -> bool {
251 match expr {
252 ast::Expr::CompoundIdentifier(parts) if parts.len() == 2 => {
253 normalize_ident(&parts[0]).eq_ignore_ascii_case(outer_alias)
254 }
255 ast::Expr::BinaryOp { left, right, .. } => {
256 refs_outer(left, outer_alias) || refs_outer(right, outer_alias)
257 }
258 _ => false,
259 }
260}
261
262fn limit_from_query(query: &ast::Query) -> Option<usize> {
264 match &query.limit_clause {
265 Some(ast::LimitClause::LimitOffset { limit, .. }) => {
266 limit.as_ref().and_then(expr_as_usize_literal)
267 }
268 Some(ast::LimitClause::OffsetCommaLimit { limit, .. }) => {
269 Some(expr_as_usize_literal(limit).unwrap_or(0))
270 }
271 None => None,
272 }
273}
274
275pub fn lateral_alias_from_factor(factor: &ast::TableFactor) -> Option<String> {
277 match factor {
278 ast::TableFactor::Derived { alias, .. } => alias.as_ref().map(|a| normalize_ident(&a.name)),
279 _ => None,
280 }
281}
282
283pub fn is_lateral_derived(factor: &ast::TableFactor) -> bool {
285 matches!(factor, ast::TableFactor::Derived { lateral: true, .. })
286}
287
288pub fn subquery_from_factor(factor: &ast::TableFactor) -> Option<&ast::Query> {
290 match factor {
291 ast::TableFactor::Derived { subquery, .. } => Some(subquery),
292 _ => None,
293 }
294}