Skip to main content

nodedb_sql/engine_rules/
mod.rs

1pub mod columnar;
2pub mod document_schemaless;
3pub mod document_strict;
4pub mod kv;
5pub mod spatial;
6pub mod timeseries;
7
8use crate::error::Result;
9use crate::types::*;
10
11/// Parameters for planning an INSERT operation.
12pub struct InsertParams {
13    pub collection: String,
14    pub columns: Vec<String>,
15    pub rows: Vec<Vec<(String, SqlValue)>>,
16    pub column_defaults: Vec<(String, String)>,
17}
18
19/// Parameters for planning a SCAN operation.
20pub struct ScanParams {
21    pub collection: String,
22    pub alias: Option<String>,
23    pub filters: Vec<Filter>,
24    pub projection: Vec<Projection>,
25    pub sort_keys: Vec<SortKey>,
26    pub limit: Option<usize>,
27    pub offset: usize,
28    pub distinct: bool,
29    pub window_functions: Vec<WindowSpec>,
30    /// Secondary indexes available on the scan's collection. Document
31    /// engines consult this to rewrite equality-on-indexed-field into
32    /// [`SqlPlan::DocumentIndexLookup`]. Other engines ignore it today.
33    pub indexes: Vec<IndexSpec>,
34}
35
36/// Parameters for planning a POINT GET operation.
37pub struct PointGetParams {
38    pub collection: String,
39    pub alias: Option<String>,
40    pub key_column: String,
41    pub key_value: SqlValue,
42}
43
44/// Parameters for planning an UPDATE operation.
45pub struct UpdateParams {
46    pub collection: String,
47    pub assignments: Vec<(String, SqlExpr)>,
48    pub filters: Vec<Filter>,
49    pub target_keys: Vec<SqlValue>,
50    pub returning: bool,
51}
52
53/// Parameters for planning a DELETE operation.
54pub struct DeleteParams {
55    pub collection: String,
56    pub filters: Vec<Filter>,
57    pub target_keys: Vec<SqlValue>,
58}
59
60/// Parameters for planning an UPSERT operation.
61pub struct UpsertParams {
62    pub collection: String,
63    pub columns: Vec<String>,
64    pub rows: Vec<Vec<(String, SqlValue)>>,
65    pub column_defaults: Vec<(String, String)>,
66    /// `ON CONFLICT (...) DO UPDATE SET` assignments. Empty for plain
67    /// `UPSERT INTO ...`; populated when the caller is
68    /// `INSERT ... ON CONFLICT ... DO UPDATE SET`.
69    pub on_conflict_updates: Vec<(String, SqlExpr)>,
70}
71
72/// Parameters for planning an AGGREGATE operation.
73pub struct AggregateParams {
74    pub collection: String,
75    pub alias: Option<String>,
76    pub filters: Vec<Filter>,
77    pub group_by: Vec<SqlExpr>,
78    pub aggregates: Vec<AggregateExpr>,
79    pub having: Vec<Filter>,
80    pub limit: usize,
81    /// Timeseries-specific: bucket interval from time_bucket() call.
82    pub bucket_interval_ms: Option<i64>,
83    /// Timeseries-specific: non-time GROUP BY columns.
84    pub group_columns: Vec<String>,
85    /// Whether the collection has auto-tiering enabled.
86    pub has_auto_tier: bool,
87}
88
89/// Engine-specific planning rules.
90///
91/// Each engine type implements this trait to produce the correct `SqlPlan`
92/// variant for each operation, or return an error if the operation is not
93/// supported. This is the single source of truth for operation routing —
94/// no downstream code should ever check engine type to decide routing.
95pub trait EngineRules {
96    /// Plan an INSERT. Returns `Err` if the engine does not support inserts
97    /// (e.g. timeseries routes to `TimeseriesIngest` instead).
98    fn plan_insert(&self, params: InsertParams) -> Result<Vec<SqlPlan>>;
99    /// Plan an UPSERT (insert-or-merge). Returns `Err` for append-only or
100    /// columnar engines that don't support merge semantics.
101    fn plan_upsert(&self, params: UpsertParams) -> Result<Vec<SqlPlan>>;
102    /// Plan a table scan (SELECT without point-get optimization).
103    fn plan_scan(&self, params: ScanParams) -> Result<SqlPlan>;
104    /// Plan a point lookup by primary key. Returns `Err` for engines
105    /// that don't support O(1) key lookups (e.g. timeseries).
106    fn plan_point_get(&self, params: PointGetParams) -> Result<SqlPlan>;
107    /// Plan an UPDATE. Returns `Err` for append-only engines.
108    fn plan_update(&self, params: UpdateParams) -> Result<Vec<SqlPlan>>;
109    /// Plan a DELETE (point or bulk).
110    fn plan_delete(&self, params: DeleteParams) -> Result<Vec<SqlPlan>>;
111    /// Plan a GROUP BY / aggregate query.
112    fn plan_aggregate(&self, params: AggregateParams) -> Result<SqlPlan>;
113}
114
115/// Attempt to rewrite `ScanParams` into a [`SqlPlan::DocumentIndexLookup`]
116/// when exactly one of the filters is an equality predicate on a `Ready`
117/// indexed field. Returns `None` to fall through to a generic `Scan`.
118///
119/// Shared by the schemaless and strict document engines so the
120/// index-rewrite rule has one source of truth. Normalizes strict column
121/// names to `$.column` before matching against index fields because the
122/// catalog stores every document index in JSON-path canonical form.
123pub(crate) fn try_document_index_lookup(
124    params: &ScanParams,
125    engine: EngineType,
126) -> Option<SqlPlan> {
127    // Sort / window functions still force a full scan because the
128    // IndexedFetch handler does not yet order rows or evaluate window
129    // aggregates. DISTINCT is safe on the index path: the handler
130    // emits each matched doc exactly once and any further dedup is a
131    // cheap Control-Plane pass on the scan-shaped response.
132    if !params.sort_keys.is_empty() || !params.window_functions.is_empty() {
133        return None;
134    }
135
136    // Iterate filters to find the first equality candidate that lines up
137    // with a Ready index. Keep the remaining filters as post-filters.
138    // Predicates appear in three shapes:
139    //   - `FilterExpr::Comparison { field, Eq, value }` — resolver path.
140    //   - `FilterExpr::Expr(BinaryOp { Column, Eq, Literal })` — generic.
141    //   - `FilterExpr::Expr(BinaryOp { left AND right })` — the
142    //     top-level AND the `convert_where_to_filters` path produces
143    //     for compound WHERE clauses like `a = 1 AND b > 2`. Descend
144    //     into AND conjuncts so an equality nested in a conjunction
145    //     still picks up the index, and the non-equality siblings
146    //     survive as a residual conjunction carried on the
147    //     IndexedFetch node (not as a wrapping Filter plan node —
148    //     that wrapping is the Control-Plane post-filter anti-pattern
149    //     the handler's `filters` payload exists to eliminate).
150    // Pre-collect the query's top-level conjuncts once. Used for
151    // partial-index entailment: every conjunct of the index predicate
152    // must appear as a conjunct of the query WHERE for the rewrite to
153    // be sound, otherwise the index would silently omit rows the
154    // query expects to see.
155    let query_conjuncts: Vec<SqlExpr> = params
156        .filters
157        .iter()
158        .flat_map(|f| match &f.expr {
159            FilterExpr::Expr(e) => {
160                let mut v = Vec::new();
161                flatten_and(e, &mut v);
162                v
163            }
164            _ => Vec::new(),
165        })
166        .collect();
167
168    for (i, f) in params.filters.iter().enumerate() {
169        let Some((field, value, residual)) = extract_equality_with_residual(&f.expr) else {
170            continue;
171        };
172        let canonical = canonical_index_field(&field);
173        let Some(idx) = params.indexes.iter().find(|i| {
174            i.state == IndexState::Ready
175                && i.field == canonical
176                && partial_index_entailed(i.predicate.as_deref(), &query_conjuncts)
177        }) else {
178            continue;
179        };
180
181        let mut remaining = params.filters.clone();
182        match residual {
183            Some(expr) => {
184                remaining[i] = Filter {
185                    expr: FilterExpr::Expr(expr),
186                };
187            }
188            None => {
189                remaining.remove(i);
190            }
191        }
192
193        let lookup_value = if idx.case_insensitive {
194            lowercase_string_value(&value)
195        } else {
196            value
197        };
198
199        return Some(SqlPlan::DocumentIndexLookup {
200            collection: params.collection.clone(),
201            alias: params.alias.clone(),
202            engine,
203            field: idx.field.clone(),
204            value: lookup_value,
205            filters: remaining,
206            projection: params.projection.clone(),
207            sort_keys: params.sort_keys.clone(),
208            limit: params.limit,
209            offset: params.offset,
210            distinct: params.distinct,
211            window_functions: params.window_functions.clone(),
212            case_insensitive: idx.case_insensitive,
213        });
214    }
215    None
216}
217
218/// Pull `(column_name, equality_value, residual_expr)` out of a filter
219/// expression if it contains a column-equals-literal predicate that can
220/// drive an index lookup. Returns `None` if no usable equality is
221/// present. The returned residual is the rest of the conjunction with
222/// the equality removed — `None` when the filter was a bare equality.
223fn extract_equality_with_residual(
224    expr: &FilterExpr,
225) -> Option<(String, SqlValue, Option<SqlExpr>)> {
226    match expr {
227        FilterExpr::Comparison {
228            field,
229            op: CompareOp::Eq,
230            value,
231        } => Some((field.clone(), value.clone(), None)),
232        FilterExpr::Expr(sql_expr) => {
233            let (col, lit, residual) = split_equality_from_expr(sql_expr)?;
234            Some((col, lit, residual))
235        }
236        _ => None,
237    }
238}
239
240/// Return `(column, literal, residual_expr)` for an equality found
241/// anywhere in a right-leaning AND conjunction tree, or `None` if no
242/// bare column-equals-literal predicate exists. The residual preserves
243/// every sibling conjunct in their original order; `None` means the
244/// expression was a bare equality with nothing left behind.
245fn split_equality_from_expr(expr: &SqlExpr) -> Option<(String, SqlValue, Option<SqlExpr>)> {
246    // Gather the conjuncts of a top-level AND chain left-to-right.
247    let mut conjuncts: Vec<SqlExpr> = Vec::new();
248    flatten_and(expr, &mut conjuncts);
249
250    // Find the first conjunct that is a bare column-equals-literal.
251    let eq_idx = conjuncts.iter().position(is_column_eq_literal)?;
252    let eq = conjuncts.remove(eq_idx);
253    let (col, lit) = match eq {
254        SqlExpr::BinaryOp { left, op, right } => match (*left, op, *right) {
255            (SqlExpr::Column { name, .. }, BinaryOp::Eq, SqlExpr::Literal(v)) => (name, v),
256            (SqlExpr::Literal(v), BinaryOp::Eq, SqlExpr::Column { name, .. }) => (name, v),
257            _ => return None,
258        },
259        _ => return None,
260    };
261
262    let residual = rebuild_and(conjuncts);
263    Some((col, lit, residual))
264}
265
266/// Append every leaf of a right-leaning `AND` tree to `out`. Non-AND
267/// expressions are a single leaf.
268fn flatten_and(expr: &SqlExpr, out: &mut Vec<SqlExpr>) {
269    match expr {
270        SqlExpr::BinaryOp {
271            left,
272            op: BinaryOp::And,
273            right,
274        } => {
275            flatten_and(left, out);
276            flatten_and(right, out);
277        }
278        other => out.push(other.clone()),
279    }
280}
281
282/// Reassemble a list of conjuncts into a right-leaning AND tree.
283/// Empty input returns `None`; single input returns itself.
284fn rebuild_and(mut conjuncts: Vec<SqlExpr>) -> Option<SqlExpr> {
285    let last = conjuncts.pop()?;
286    Some(
287        conjuncts
288            .into_iter()
289            .rfold(last, |acc, next| SqlExpr::BinaryOp {
290                left: Box::new(next),
291                op: BinaryOp::And,
292                right: Box::new(acc),
293            }),
294    )
295}
296
297/// Decide whether the query's WHERE conjuncts entail the partial-index
298/// predicate. `None` predicate means a full index — trivially entailed.
299/// Initial version uses conjunct-level structural equality: every
300/// conjunct of the index predicate must appear (by `PartialEq`) as a
301/// conjunct of the query. This is conservative and deliberately so —
302/// a false positive here would silently omit rows from query results,
303/// which is the exact bug #73 reports.
304fn partial_index_entailed(predicate: Option<&str>, query_conjuncts: &[SqlExpr]) -> bool {
305    let Some(text) = predicate else {
306        return true;
307    };
308    let Ok(parsed) = crate::parse_expr_string(text) else {
309        // A catalog predicate we can't parse is not entailed — refuse
310        // to use the index rather than assume anything about its
311        // contents. The DDL path validates at CREATE INDEX time, so
312        // reaching this branch indicates drift.
313        return false;
314    };
315    let mut index_conjuncts: Vec<SqlExpr> = Vec::new();
316    flatten_and(&parsed, &mut index_conjuncts);
317    // Structural equality via the stable `Debug` representation:
318    // `SqlExpr` doesn't derive `PartialEq` (several nested variants
319    // carry types that can't derive it cheaply), but the Debug form is
320    // canonical for equivalent trees produced by the same parser. This
321    // is conservative — it matches only identical AST shapes and not,
322    // e.g., `a = 1` vs `1 = a`. Callers should write index predicates
323    // in the same normal form they use in query WHERE clauses, which
324    // is the convention everywhere else in the codebase.
325    index_conjuncts.iter().all(|ic| {
326        let ic_dbg = format!("{ic:?}");
327        query_conjuncts.iter().any(|qc| format!("{qc:?}") == ic_dbg)
328    })
329}
330
331fn is_column_eq_literal(expr: &SqlExpr) -> bool {
332    matches!(
333        expr,
334        SqlExpr::BinaryOp { left, op: BinaryOp::Eq, right }
335            if matches!(
336                (left.as_ref(), right.as_ref()),
337                (SqlExpr::Column { .. }, SqlExpr::Literal(_))
338                | (SqlExpr::Literal(_), SqlExpr::Column { .. }),
339            )
340    )
341}
342
343fn canonical_index_field(field: &str) -> String {
344    if field.starts_with("$.") || field.starts_with('$') {
345        field.to_string()
346    } else {
347        format!("$.{field}")
348    }
349}
350
351fn lowercase_string_value(v: &SqlValue) -> SqlValue {
352    if let SqlValue::String(s) = v {
353        SqlValue::String(s.to_lowercase())
354    } else {
355        v.clone()
356    }
357}
358
359/// Resolve the engine rules for a given engine type.
360///
361/// No catch-all — compiler enforces exhaustiveness.
362pub fn resolve_engine_rules(engine: EngineType) -> &'static dyn EngineRules {
363    match engine {
364        EngineType::DocumentSchemaless => &document_schemaless::SchemalessRules,
365        EngineType::DocumentStrict => &document_strict::StrictRules,
366        EngineType::KeyValue => &kv::KvRules,
367        EngineType::Columnar => &columnar::ColumnarRules,
368        EngineType::Timeseries => &timeseries::TimeseriesRules,
369        EngineType::Spatial => &spatial::SpatialRules,
370    }
371}