Skip to main content

nodedb_sql/planner/select/
entry.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Top-level query entry: CTE handling, UNION dispatch, and LIMIT
4//! application. ORDER BY and search-trigger detection live in `order_by.rs`.
5
6use nodedb_types::DatabaseId;
7use sqlparser::ast::{self, Query, SetExpr};
8
9use super::order_by::{apply_order_by, try_hybrid_from_projection};
10use super::select_stmt::plan_select;
11use crate::error::{Result, SqlError};
12use crate::functions::registry::FunctionRegistry;
13use crate::parser::normalize::normalize_ident;
14use crate::temporal::TemporalScope;
15use crate::types::{Projection, SqlExpr, *};
16
17/// Default `ef_search` multiplier applied when LIMIT is the only signal
18/// available for sizing the HNSW beam (e.g. on a fused VectorSearch that
19/// inherited LIMIT after `apply_order_by`). Wider beams trade extra distance
20/// computations for higher recall; `2 * top_k` is a standard heuristic.
21const DEFAULT_EF_SEARCH_MULTIPLIER: usize = 2;
22
23/// Returns `true` when every projection item is either:
24/// - a plain column reference to the surrogate/PK column (`id` or `document_id`), or
25/// - a `vector_distance(...)` function call (any alias).
26///
27/// Anything else — a payload field, `*`, or an unrecognised expression — returns `false`.
28fn is_pure_vector_projection(projection: &[Projection]) -> bool {
29    if projection.is_empty() {
30        return false;
31    }
32    for item in projection {
33        match item {
34            Projection::Column(name) => {
35                let lower = name.to_ascii_lowercase();
36                if lower != "id" && lower != "document_id" {
37                    return false;
38                }
39            }
40            Projection::Computed { expr, .. } => {
41                // Accept any of the three vector distance function names.
42                let SqlExpr::Function { name, .. } = expr else {
43                    return false;
44                };
45                if !name.eq_ignore_ascii_case("vector_distance")
46                    && !name.eq_ignore_ascii_case("vector_cosine_distance")
47                    && !name.eq_ignore_ascii_case("vector_neg_inner_product")
48                {
49                    return false;
50                }
51            }
52            Projection::Star | Projection::QualifiedStar(_) => return false,
53        }
54    }
55    true
56}
57
58/// Plan a SELECT query.
59pub fn plan_query(
60    query: &Query,
61    catalog: &dyn SqlCatalog,
62    functions: &FunctionRegistry,
63    temporal: TemporalScope,
64) -> Result<SqlPlan> {
65    // Handle CTEs (WITH clause).
66    if let Some(with) = &query.with
67        && with.recursive
68    {
69        return crate::planner::cte::plan_recursive_cte(query, catalog, functions, temporal);
70    }
71    // Non-recursive CTEs: plan each CTE subquery and the outer query.
72    if let Some(with) = &query.with
73        && !with.cte_tables.is_empty()
74    {
75        let inner_query = Query {
76            with: None,
77            body: query.body.clone(),
78            order_by: query.order_by.clone(),
79            limit_clause: query.limit_clause.clone(),
80            fetch: query.fetch.clone(),
81            locks: query.locks.clone(),
82            for_clause: query.for_clause.clone(),
83            settings: query.settings.clone(),
84            format_clause: query.format_clause.clone(),
85            pipe_operators: query.pipe_operators.clone(),
86        };
87
88        // Plan each CTE subquery.
89        let mut definitions = Vec::new();
90        let mut cte_names = Vec::new();
91        for cte in &with.cte_tables {
92            let name = normalize_ident(&cte.alias.name);
93            let cte_plan = plan_query(&cte.query, catalog, functions, temporal)?;
94            definitions.push((name.clone(), cte_plan));
95            cte_names.push(name);
96        }
97
98        // Build CTE-aware catalog so the outer query can reference CTE names.
99        let cte_catalog = CteCatalog {
100            inner: catalog,
101            cte_names,
102        };
103        let outer = plan_query(&inner_query, &cte_catalog, functions, temporal)?;
104
105        return Ok(SqlPlan::Cte {
106            definitions,
107            outer: Box::new(outer),
108        });
109    }
110
111    // Handle UNION.
112    match &*query.body {
113        SetExpr::Select(select) => {
114            let mut plan = plan_select(select, catalog, functions, temporal)?;
115            // Snapshot the projection before ORDER BY transforms the plan,
116            // in case `apply_order_by` converts a Scan into VectorSearch.
117            let pre_order_by_projection: Option<Vec<Projection>> = match &plan {
118                SqlPlan::Scan { projection, .. } => Some(projection.clone()),
119                _ => None,
120            };
121            let pre_order_by_collection: Option<String> = match &plan {
122                SqlPlan::Scan { collection, .. } => Some(collection.clone()),
123                _ => None,
124            };
125            if let Some(order_by) = &query.order_by {
126                plan = apply_order_by(&plan, order_by, functions, &select.projection)?;
127            }
128            // Fall back to a SELECT-projection scan for hybrid-search and
129            // text-search triggers. The `SELECT id, rrf_score(...) AS score
130            // FROM c WHERE ... LIMIT N` shape has no ORDER BY, so
131            // `apply_order_by` cannot fire. The same applies to
132            // `SELECT id, bm25_score(field, term) FROM c ORDER BY id` where
133            // ORDER BY does not contain a search trigger.
134            //
135            // Also fires when the plan is already `TextSearch` (set by the
136            // WHERE `text_match(...)` path) and the SELECT list additionally
137            // contains `bm25_score(...)` — in that case we attach the
138            // `score_alias` so the executor knows to inject the score column.
139            if matches!(plan, SqlPlan::Scan { .. } | SqlPlan::TextSearch { .. })
140                && let Some(upgraded_plan) =
141                    try_hybrid_from_projection(&plan, &select.projection, functions)?
142            {
143                plan = upgraded_plan;
144            }
145            // After ORDER BY: if we now have a VectorSearch, check whether
146            // the collection is vector-primary and the projection is
147            // payload-free. If so, set `skip_payload_fetch`.
148            if let SqlPlan::VectorSearch {
149                ref collection,
150                ref mut skip_payload_fetch,
151                ref mut filters,
152                ref mut payload_filters,
153                ..
154            } = plan
155            {
156                let info = catalog
157                    .get_collection(DatabaseId::DEFAULT, collection)
158                    .ok()
159                    .flatten();
160                let is_vector_primary = info
161                    .as_ref()
162                    .map(|c| c.primary == nodedb_types::PrimaryEngine::Vector)
163                    .unwrap_or(false);
164                if is_vector_primary {
165                    if let Some(ref proj) = pre_order_by_projection
166                        && pre_order_by_collection.as_deref() == Some(collection.as_str())
167                    {
168                        *skip_payload_fetch = is_pure_vector_projection(proj);
169                    }
170                    if let Some(vp) = info.as_ref().and_then(|c| c.vector_primary.as_ref()) {
171                        let mut peeled: Vec<SqlPayloadAtom> = Vec::new();
172                        let is_indexed = |name: &str| {
173                            vp.payload_indexes
174                                .iter()
175                                .any(|(p, _)| p.eq_ignore_ascii_case(name))
176                        };
177                        filters.retain(|f| match &f.expr {
178                            FilterExpr::Comparison {
179                                field,
180                                op: CompareOp::Eq,
181                                value,
182                            } if is_indexed(field) => {
183                                peeled.push(SqlPayloadAtom::Eq(field.clone(), value.clone()));
184                                false
185                            }
186                            FilterExpr::InList { field, values } if is_indexed(field) => {
187                                peeled.push(SqlPayloadAtom::In(field.clone(), values.clone()));
188                                false
189                            }
190                            FilterExpr::Between { field, low, high } if is_indexed(field) => {
191                                peeled.push(SqlPayloadAtom::Range {
192                                    field: field.clone(),
193                                    low: Some(low.clone()),
194                                    low_inclusive: true,
195                                    high: Some(high.clone()),
196                                    high_inclusive: true,
197                                });
198                                false
199                            }
200                            FilterExpr::Comparison { field, op, value }
201                                if matches!(
202                                    op,
203                                    CompareOp::Lt | CompareOp::Le | CompareOp::Gt | CompareOp::Ge
204                                ) && is_indexed(field) =>
205                            {
206                                let inclusive = matches!(op, CompareOp::Le | CompareOp::Ge);
207                                let upper = matches!(op, CompareOp::Lt | CompareOp::Le);
208                                peeled.push(SqlPayloadAtom::Range {
209                                    field: field.clone(),
210                                    low: if upper { None } else { Some(value.clone()) },
211                                    low_inclusive: !upper && inclusive,
212                                    high: if upper { Some(value.clone()) } else { None },
213                                    high_inclusive: upper && inclusive,
214                                });
215                                false
216                            }
217                            FilterExpr::Expr(SqlExpr::BinaryOp {
218                                left,
219                                op: BinaryOp::Eq,
220                                right,
221                            }) => match (&**left, &**right) {
222                                (SqlExpr::Column { name, .. }, SqlExpr::Literal(v))
223                                    if is_indexed(name) =>
224                                {
225                                    peeled.push(SqlPayloadAtom::Eq(name.clone(), v.clone()));
226                                    false
227                                }
228                                (SqlExpr::Literal(v), SqlExpr::Column { name, .. })
229                                    if is_indexed(name) =>
230                                {
231                                    peeled.push(SqlPayloadAtom::Eq(name.clone(), v.clone()));
232                                    false
233                                }
234                                _ => true,
235                            },
236                            FilterExpr::Expr(SqlExpr::InList {
237                                expr,
238                                list,
239                                negated: false,
240                            }) => match &**expr {
241                                SqlExpr::Column { name, .. } if is_indexed(name) => {
242                                    let mut lits = Vec::with_capacity(list.len());
243                                    let all_lit = list.iter().all(|e| {
244                                        if let SqlExpr::Literal(v) = e {
245                                            lits.push(v.clone());
246                                            true
247                                        } else {
248                                            false
249                                        }
250                                    });
251                                    if all_lit {
252                                        peeled.push(SqlPayloadAtom::In(name.clone(), lits));
253                                        false
254                                    } else {
255                                        true
256                                    }
257                                }
258                                _ => true,
259                            },
260                            FilterExpr::Expr(SqlExpr::Between {
261                                expr,
262                                low,
263                                high,
264                                negated: false,
265                            }) => match (&**expr, &**low, &**high) {
266                                (
267                                    SqlExpr::Column { name, .. },
268                                    SqlExpr::Literal(lo),
269                                    SqlExpr::Literal(hi),
270                                ) if is_indexed(name) => {
271                                    peeled.push(SqlPayloadAtom::Range {
272                                        field: name.clone(),
273                                        low: Some(lo.clone()),
274                                        low_inclusive: true,
275                                        high: Some(hi.clone()),
276                                        high_inclusive: true,
277                                    });
278                                    false
279                                }
280                                _ => true,
281                            },
282                            FilterExpr::Expr(SqlExpr::BinaryOp { left, op, right })
283                                if matches!(
284                                    op,
285                                    BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge
286                                ) =>
287                            {
288                                match (&**left, &**right) {
289                                    (SqlExpr::Column { name, .. }, SqlExpr::Literal(v))
290                                        if is_indexed(name) =>
291                                    {
292                                        let inclusive = matches!(op, BinaryOp::Le | BinaryOp::Ge);
293                                        let upper = matches!(op, BinaryOp::Lt | BinaryOp::Le);
294                                        peeled.push(SqlPayloadAtom::Range {
295                                            field: name.clone(),
296                                            low: if upper { None } else { Some(v.clone()) },
297                                            low_inclusive: !upper && inclusive,
298                                            high: if upper { Some(v.clone()) } else { None },
299                                            high_inclusive: upper && inclusive,
300                                        });
301                                        false
302                                    }
303                                    _ => true,
304                                }
305                            }
306                            _ => true,
307                        });
308                        *payload_filters = peeled;
309                    }
310                }
311            }
312            plan = apply_limit(plan, &query.limit_clause);
313            Ok(plan)
314        }
315        SetExpr::SetOperation {
316            op,
317            left,
318            right,
319            set_quantifier,
320        } => crate::planner::union::plan_set_operation(
321            op,
322            left,
323            right,
324            set_quantifier,
325            catalog,
326            functions,
327            temporal,
328        ),
329        _ => Err(SqlError::Unsupported {
330            detail: format!("query body type: {}", query.body),
331        }),
332    }
333}
334
335/// Apply LIMIT and OFFSET to a plan.
336fn apply_limit(mut plan: SqlPlan, limit_clause: &Option<ast::LimitClause>) -> SqlPlan {
337    let (limit_val, offset_val) = match limit_clause {
338        None => (None, 0usize),
339        Some(ast::LimitClause::LimitOffset { limit, offset, .. }) => {
340            let lv = limit
341                .as_ref()
342                .and_then(crate::coerce::expr_as_usize_literal);
343            let ov = offset
344                .as_ref()
345                .and_then(|o| crate::coerce::expr_as_usize_literal(&o.value))
346                .unwrap_or(0);
347            (lv, ov)
348        }
349        Some(ast::LimitClause::OffsetCommaLimit { offset, limit }) => {
350            let lv = crate::coerce::expr_as_usize_literal(limit);
351            let ov = crate::coerce::expr_as_usize_literal(offset).unwrap_or(0);
352            (lv, ov)
353        }
354    };
355
356    match plan {
357        SqlPlan::Scan {
358            ref mut limit,
359            ref mut offset,
360            ..
361        } => {
362            *limit = limit_val;
363            *offset = offset_val;
364        }
365        SqlPlan::Aggregate {
366            limit: ref mut l, ..
367        } => {
368            if let Some(lv) = limit_val {
369                *l = lv;
370            }
371        }
372        SqlPlan::VectorSearch {
373            top_k: ref mut k,
374            ef_search: ref mut ef,
375            ann_options: ref opts,
376            ..
377        } => {
378            // Fused VectorSearch (e.g. ORDER BY vector_distance + JOIN
379            // ARRAY_SLICE) inherits its outer LIMIT here. Without this,
380            // a join-derived VectorSearch carries the join's default
381            // 10000 limit instead of the user's `LIMIT N`.
382            if let Some(lv) = limit_val {
383                *k = lv;
384                *ef = opts
385                    .ef_search_override
386                    .unwrap_or(lv * DEFAULT_EF_SEARCH_MULTIPLIER);
387            }
388        }
389        _ => {}
390    }
391    plan
392}
393
394/// Catalog wrapper that resolves CTE names as schemaless document collections.
395pub(crate) struct CteCatalog<'a> {
396    pub(crate) inner: &'a dyn SqlCatalog,
397    pub(crate) cte_names: Vec<String>,
398}
399
400impl SqlCatalog for CteCatalog<'_> {
401    fn get_collection(
402        &self,
403        database_id: DatabaseId,
404        name: &str,
405    ) -> std::result::Result<Option<CollectionInfo>, SqlCatalogError> {
406        // Check CTE names first.
407        if self.cte_names.iter().any(|n| n == name) {
408            return Ok(Some(CollectionInfo {
409                name: name.into(),
410                engine: EngineType::DocumentSchemaless,
411                columns: Vec::new(),
412                primary_key: Some("id".into()),
413                has_auto_tier: false,
414                indexes: Vec::new(),
415                bitemporal: false,
416                primary: nodedb_types::PrimaryEngine::Document,
417                vector_primary: None,
418            }));
419        }
420        self.inner.get_collection(database_id, name)
421    }
422}