1use nodedb_types::DatabaseId;
7use sqlparser::ast::{self, Query, SetExpr};
8
9use super::order_by::{apply_order_by, try_hybrid_from_projection};
10use super::select_stmt::plan_select;
11use crate::error::{Result, SqlError};
12use crate::functions::registry::FunctionRegistry;
13use crate::parser::normalize::normalize_ident;
14use crate::temporal::TemporalScope;
15use crate::types::{Projection, SqlExpr, *};
16
17const DEFAULT_EF_SEARCH_MULTIPLIER: usize = 2;
22
23fn is_pure_vector_projection(projection: &[Projection]) -> bool {
29 if projection.is_empty() {
30 return false;
31 }
32 for item in projection {
33 match item {
34 Projection::Column(name) => {
35 let lower = name.to_ascii_lowercase();
36 if lower != "id" && lower != "document_id" {
37 return false;
38 }
39 }
40 Projection::Computed { expr, .. } => {
41 let SqlExpr::Function { name, .. } = expr else {
43 return false;
44 };
45 if !name.eq_ignore_ascii_case("vector_distance")
46 && !name.eq_ignore_ascii_case("vector_cosine_distance")
47 && !name.eq_ignore_ascii_case("vector_neg_inner_product")
48 {
49 return false;
50 }
51 }
52 Projection::Star | Projection::QualifiedStar(_) => return false,
53 }
54 }
55 true
56}
57
58pub fn plan_query(
60 query: &Query,
61 catalog: &dyn SqlCatalog,
62 functions: &FunctionRegistry,
63 temporal: TemporalScope,
64) -> Result<SqlPlan> {
65 if let Some(with) = &query.with
67 && with.recursive
68 {
69 return crate::planner::cte::plan_recursive_cte(query, catalog, functions, temporal);
70 }
71 if let Some(with) = &query.with
73 && !with.cte_tables.is_empty()
74 {
75 let inner_query = Query {
76 with: None,
77 body: query.body.clone(),
78 order_by: query.order_by.clone(),
79 limit_clause: query.limit_clause.clone(),
80 fetch: query.fetch.clone(),
81 locks: query.locks.clone(),
82 for_clause: query.for_clause.clone(),
83 settings: query.settings.clone(),
84 format_clause: query.format_clause.clone(),
85 pipe_operators: query.pipe_operators.clone(),
86 };
87
88 let mut definitions = Vec::new();
90 let mut cte_names = Vec::new();
91 for cte in &with.cte_tables {
92 let name = normalize_ident(&cte.alias.name);
93 let cte_plan = plan_query(&cte.query, catalog, functions, temporal)?;
94 definitions.push((name.clone(), cte_plan));
95 cte_names.push(name);
96 }
97
98 let cte_catalog = CteCatalog {
100 inner: catalog,
101 cte_names,
102 };
103 let outer = plan_query(&inner_query, &cte_catalog, functions, temporal)?;
104
105 return Ok(SqlPlan::Cte {
106 definitions,
107 outer: Box::new(outer),
108 });
109 }
110
111 match &*query.body {
113 SetExpr::Select(select) => {
114 let mut plan = plan_select(select, catalog, functions, temporal)?;
115 let pre_order_by_projection: Option<Vec<Projection>> = match &plan {
118 SqlPlan::Scan { projection, .. } => Some(projection.clone()),
119 _ => None,
120 };
121 let pre_order_by_collection: Option<String> = match &plan {
122 SqlPlan::Scan { collection, .. } => Some(collection.clone()),
123 _ => None,
124 };
125 if let Some(order_by) = &query.order_by {
126 plan = apply_order_by(&plan, order_by, functions, &select.projection)?;
127 }
128 if matches!(plan, SqlPlan::Scan { .. } | SqlPlan::TextSearch { .. })
140 && let Some(upgraded_plan) =
141 try_hybrid_from_projection(&plan, &select.projection, functions)?
142 {
143 plan = upgraded_plan;
144 }
145 if let SqlPlan::VectorSearch {
149 ref collection,
150 ref mut skip_payload_fetch,
151 ref mut filters,
152 ref mut payload_filters,
153 ..
154 } = plan
155 {
156 let info = catalog
157 .get_collection(DatabaseId::DEFAULT, collection)
158 .ok()
159 .flatten();
160 let is_vector_primary = info
161 .as_ref()
162 .map(|c| c.primary == nodedb_types::PrimaryEngine::Vector)
163 .unwrap_or(false);
164 if is_vector_primary {
165 if let Some(ref proj) = pre_order_by_projection
166 && pre_order_by_collection.as_deref() == Some(collection.as_str())
167 {
168 *skip_payload_fetch = is_pure_vector_projection(proj);
169 }
170 if let Some(vp) = info.as_ref().and_then(|c| c.vector_primary.as_ref()) {
171 let mut peeled: Vec<SqlPayloadAtom> = Vec::new();
172 let is_indexed = |name: &str| {
173 vp.payload_indexes
174 .iter()
175 .any(|(p, _)| p.eq_ignore_ascii_case(name))
176 };
177 filters.retain(|f| match &f.expr {
178 FilterExpr::Comparison {
179 field,
180 op: CompareOp::Eq,
181 value,
182 } if is_indexed(field) => {
183 peeled.push(SqlPayloadAtom::Eq(field.clone(), value.clone()));
184 false
185 }
186 FilterExpr::InList { field, values } if is_indexed(field) => {
187 peeled.push(SqlPayloadAtom::In(field.clone(), values.clone()));
188 false
189 }
190 FilterExpr::Between { field, low, high } if is_indexed(field) => {
191 peeled.push(SqlPayloadAtom::Range {
192 field: field.clone(),
193 low: Some(low.clone()),
194 low_inclusive: true,
195 high: Some(high.clone()),
196 high_inclusive: true,
197 });
198 false
199 }
200 FilterExpr::Comparison { field, op, value }
201 if matches!(
202 op,
203 CompareOp::Lt | CompareOp::Le | CompareOp::Gt | CompareOp::Ge
204 ) && is_indexed(field) =>
205 {
206 let inclusive = matches!(op, CompareOp::Le | CompareOp::Ge);
207 let upper = matches!(op, CompareOp::Lt | CompareOp::Le);
208 peeled.push(SqlPayloadAtom::Range {
209 field: field.clone(),
210 low: if upper { None } else { Some(value.clone()) },
211 low_inclusive: !upper && inclusive,
212 high: if upper { Some(value.clone()) } else { None },
213 high_inclusive: upper && inclusive,
214 });
215 false
216 }
217 FilterExpr::Expr(SqlExpr::BinaryOp {
218 left,
219 op: BinaryOp::Eq,
220 right,
221 }) => match (&**left, &**right) {
222 (SqlExpr::Column { name, .. }, SqlExpr::Literal(v))
223 if is_indexed(name) =>
224 {
225 peeled.push(SqlPayloadAtom::Eq(name.clone(), v.clone()));
226 false
227 }
228 (SqlExpr::Literal(v), SqlExpr::Column { name, .. })
229 if is_indexed(name) =>
230 {
231 peeled.push(SqlPayloadAtom::Eq(name.clone(), v.clone()));
232 false
233 }
234 _ => true,
235 },
236 FilterExpr::Expr(SqlExpr::InList {
237 expr,
238 list,
239 negated: false,
240 }) => match &**expr {
241 SqlExpr::Column { name, .. } if is_indexed(name) => {
242 let mut lits = Vec::with_capacity(list.len());
243 let all_lit = list.iter().all(|e| {
244 if let SqlExpr::Literal(v) = e {
245 lits.push(v.clone());
246 true
247 } else {
248 false
249 }
250 });
251 if all_lit {
252 peeled.push(SqlPayloadAtom::In(name.clone(), lits));
253 false
254 } else {
255 true
256 }
257 }
258 _ => true,
259 },
260 FilterExpr::Expr(SqlExpr::Between {
261 expr,
262 low,
263 high,
264 negated: false,
265 }) => match (&**expr, &**low, &**high) {
266 (
267 SqlExpr::Column { name, .. },
268 SqlExpr::Literal(lo),
269 SqlExpr::Literal(hi),
270 ) if is_indexed(name) => {
271 peeled.push(SqlPayloadAtom::Range {
272 field: name.clone(),
273 low: Some(lo.clone()),
274 low_inclusive: true,
275 high: Some(hi.clone()),
276 high_inclusive: true,
277 });
278 false
279 }
280 _ => true,
281 },
282 FilterExpr::Expr(SqlExpr::BinaryOp { left, op, right })
283 if matches!(
284 op,
285 BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge
286 ) =>
287 {
288 match (&**left, &**right) {
289 (SqlExpr::Column { name, .. }, SqlExpr::Literal(v))
290 if is_indexed(name) =>
291 {
292 let inclusive = matches!(op, BinaryOp::Le | BinaryOp::Ge);
293 let upper = matches!(op, BinaryOp::Lt | BinaryOp::Le);
294 peeled.push(SqlPayloadAtom::Range {
295 field: name.clone(),
296 low: if upper { None } else { Some(v.clone()) },
297 low_inclusive: !upper && inclusive,
298 high: if upper { Some(v.clone()) } else { None },
299 high_inclusive: upper && inclusive,
300 });
301 false
302 }
303 _ => true,
304 }
305 }
306 _ => true,
307 });
308 *payload_filters = peeled;
309 }
310 }
311 }
312 plan = apply_limit(plan, &query.limit_clause);
313 Ok(plan)
314 }
315 SetExpr::SetOperation {
316 op,
317 left,
318 right,
319 set_quantifier,
320 } => crate::planner::union::plan_set_operation(
321 op,
322 left,
323 right,
324 set_quantifier,
325 catalog,
326 functions,
327 temporal,
328 ),
329 _ => Err(SqlError::Unsupported {
330 detail: format!("query body type: {}", query.body),
331 }),
332 }
333}
334
335fn apply_limit(mut plan: SqlPlan, limit_clause: &Option<ast::LimitClause>) -> SqlPlan {
337 let (limit_val, offset_val) = match limit_clause {
338 None => (None, 0usize),
339 Some(ast::LimitClause::LimitOffset { limit, offset, .. }) => {
340 let lv = limit
341 .as_ref()
342 .and_then(crate::coerce::expr_as_usize_literal);
343 let ov = offset
344 .as_ref()
345 .and_then(|o| crate::coerce::expr_as_usize_literal(&o.value))
346 .unwrap_or(0);
347 (lv, ov)
348 }
349 Some(ast::LimitClause::OffsetCommaLimit { offset, limit }) => {
350 let lv = crate::coerce::expr_as_usize_literal(limit);
351 let ov = crate::coerce::expr_as_usize_literal(offset).unwrap_or(0);
352 (lv, ov)
353 }
354 };
355
356 match plan {
357 SqlPlan::Scan {
358 ref mut limit,
359 ref mut offset,
360 ..
361 } => {
362 *limit = limit_val;
363 *offset = offset_val;
364 }
365 SqlPlan::Aggregate {
366 limit: ref mut l, ..
367 } => {
368 if let Some(lv) = limit_val {
369 *l = lv;
370 }
371 }
372 SqlPlan::VectorSearch {
373 top_k: ref mut k,
374 ef_search: ref mut ef,
375 ann_options: ref opts,
376 ..
377 } => {
378 if let Some(lv) = limit_val {
383 *k = lv;
384 *ef = opts
385 .ef_search_override
386 .unwrap_or(lv * DEFAULT_EF_SEARCH_MULTIPLIER);
387 }
388 }
389 _ => {}
390 }
391 plan
392}
393
394pub(crate) struct CteCatalog<'a> {
396 pub(crate) inner: &'a dyn SqlCatalog,
397 pub(crate) cte_names: Vec<String>,
398}
399
400impl SqlCatalog for CteCatalog<'_> {
401 fn get_collection(
402 &self,
403 database_id: DatabaseId,
404 name: &str,
405 ) -> std::result::Result<Option<CollectionInfo>, SqlCatalogError> {
406 if self.cte_names.iter().any(|n| n == name) {
408 return Ok(Some(CollectionInfo {
409 name: name.into(),
410 engine: EngineType::DocumentSchemaless,
411 columns: Vec::new(),
412 primary_key: Some("id".into()),
413 has_auto_tier: false,
414 indexes: Vec::new(),
415 bitemporal: false,
416 primary: nodedb_types::PrimaryEngine::Document,
417 vector_primary: None,
418 }));
419 }
420 self.inner.get_collection(database_id, name)
421 }
422}