Skip to main content

nodedb_sql/engine_rules/
mod.rs

1pub mod columnar;
2pub mod document_schemaless;
3pub mod document_strict;
4pub mod kv;
5pub mod spatial;
6pub mod timeseries;
7
8use crate::error::Result;
9use crate::types::*;
10
11/// Parameters for planning an INSERT operation.
12pub struct InsertParams {
13    pub collection: String,
14    pub columns: Vec<String>,
15    pub rows: Vec<Vec<(String, SqlValue)>>,
16    pub column_defaults: Vec<(String, String)>,
17    /// `ON CONFLICT DO NOTHING` semantics: duplicate-PK rows are skipped
18    /// silently. `false` for plain `INSERT` (raises `unique_violation`).
19    pub if_absent: bool,
20}
21
22/// Parameters for planning a SCAN operation.
23pub struct ScanParams {
24    pub collection: String,
25    pub alias: Option<String>,
26    pub filters: Vec<Filter>,
27    pub projection: Vec<Projection>,
28    pub sort_keys: Vec<SortKey>,
29    pub limit: Option<usize>,
30    pub offset: usize,
31    pub distinct: bool,
32    pub window_functions: Vec<WindowSpec>,
33    /// Secondary indexes available on the scan's collection. Document
34    /// engines consult this to rewrite equality-on-indexed-field into
35    /// [`SqlPlan::DocumentIndexLookup`]. Other engines ignore it today.
36    pub indexes: Vec<IndexSpec>,
37}
38
39/// Parameters for planning a POINT GET operation.
40pub struct PointGetParams {
41    pub collection: String,
42    pub alias: Option<String>,
43    pub key_column: String,
44    pub key_value: SqlValue,
45}
46
47/// Parameters for planning an UPDATE operation.
48pub struct UpdateParams {
49    pub collection: String,
50    pub assignments: Vec<(String, SqlExpr)>,
51    pub filters: Vec<Filter>,
52    pub target_keys: Vec<SqlValue>,
53    pub returning: bool,
54}
55
56/// Parameters for planning a DELETE operation.
57pub struct DeleteParams {
58    pub collection: String,
59    pub filters: Vec<Filter>,
60    pub target_keys: Vec<SqlValue>,
61}
62
63/// Parameters for planning an UPSERT operation.
64pub struct UpsertParams {
65    pub collection: String,
66    pub columns: Vec<String>,
67    pub rows: Vec<Vec<(String, SqlValue)>>,
68    pub column_defaults: Vec<(String, String)>,
69    /// `ON CONFLICT (...) DO UPDATE SET` assignments. Empty for plain
70    /// `UPSERT INTO ...`; populated when the caller is
71    /// `INSERT ... ON CONFLICT ... DO UPDATE SET`.
72    pub on_conflict_updates: Vec<(String, SqlExpr)>,
73}
74
75/// Parameters for planning an AGGREGATE operation.
76pub struct AggregateParams {
77    pub collection: String,
78    pub alias: Option<String>,
79    pub filters: Vec<Filter>,
80    pub group_by: Vec<SqlExpr>,
81    pub aggregates: Vec<AggregateExpr>,
82    pub having: Vec<Filter>,
83    pub limit: usize,
84    /// Timeseries-specific: bucket interval from time_bucket() call.
85    pub bucket_interval_ms: Option<i64>,
86    /// Timeseries-specific: non-time GROUP BY columns.
87    pub group_columns: Vec<String>,
88    /// Whether the collection has auto-tiering enabled.
89    pub has_auto_tier: bool,
90}
91
92/// Engine-specific planning rules.
93///
94/// Each engine type implements this trait to produce the correct `SqlPlan`
95/// variant for each operation, or return an error if the operation is not
96/// supported. This is the single source of truth for operation routing —
97/// no downstream code should ever check engine type to decide routing.
98pub trait EngineRules {
99    /// Plan an INSERT. Returns `Err` if the engine does not support inserts
100    /// (e.g. timeseries routes to `TimeseriesIngest` instead).
101    fn plan_insert(&self, params: InsertParams) -> Result<Vec<SqlPlan>>;
102    /// Plan an UPSERT (insert-or-merge). Returns `Err` for append-only or
103    /// columnar engines that don't support merge semantics.
104    fn plan_upsert(&self, params: UpsertParams) -> Result<Vec<SqlPlan>>;
105    /// Plan a table scan (SELECT without point-get optimization).
106    fn plan_scan(&self, params: ScanParams) -> Result<SqlPlan>;
107    /// Plan a point lookup by primary key. Returns `Err` for engines
108    /// that don't support O(1) key lookups (e.g. timeseries).
109    fn plan_point_get(&self, params: PointGetParams) -> Result<SqlPlan>;
110    /// Plan an UPDATE. Returns `Err` for append-only engines.
111    fn plan_update(&self, params: UpdateParams) -> Result<Vec<SqlPlan>>;
112    /// Plan a DELETE (point or bulk).
113    fn plan_delete(&self, params: DeleteParams) -> Result<Vec<SqlPlan>>;
114    /// Plan a GROUP BY / aggregate query.
115    fn plan_aggregate(&self, params: AggregateParams) -> Result<SqlPlan>;
116}
117
118/// Attempt to rewrite `ScanParams` into a [`SqlPlan::DocumentIndexLookup`]
119/// when exactly one of the filters is an equality predicate on a `Ready`
120/// indexed field. Returns `None` to fall through to a generic `Scan`.
121///
122/// Shared by the schemaless and strict document engines so the
123/// index-rewrite rule has one source of truth. Normalizes strict column
124/// names to `$.column` before matching against index fields because the
125/// catalog stores every document index in JSON-path canonical form.
126pub(crate) fn try_document_index_lookup(
127    params: &ScanParams,
128    engine: EngineType,
129) -> Option<SqlPlan> {
130    // Sort / window functions still force a full scan because the
131    // IndexedFetch handler does not yet order rows or evaluate window
132    // aggregates. DISTINCT is safe on the index path: the handler
133    // emits each matched doc exactly once and any further dedup is a
134    // cheap Control-Plane pass on the scan-shaped response.
135    if !params.sort_keys.is_empty() || !params.window_functions.is_empty() {
136        return None;
137    }
138
139    // Iterate filters to find the first equality candidate that lines up
140    // with a Ready index. Keep the remaining filters as post-filters.
141    // Predicates appear in three shapes:
142    //   - `FilterExpr::Comparison { field, Eq, value }` — resolver path.
143    //   - `FilterExpr::Expr(BinaryOp { Column, Eq, Literal })` — generic.
144    //   - `FilterExpr::Expr(BinaryOp { left AND right })` — the
145    //     top-level AND the `convert_where_to_filters` path produces
146    //     for compound WHERE clauses like `a = 1 AND b > 2`. Descend
147    //     into AND conjuncts so an equality nested in a conjunction
148    //     still picks up the index, and the non-equality siblings
149    //     survive as a residual conjunction carried on the
150    //     IndexedFetch node (not as a wrapping Filter plan node —
151    //     that wrapping is the Control-Plane post-filter anti-pattern
152    //     the handler's `filters` payload exists to eliminate).
153    // Pre-collect the query's top-level conjuncts once. Used for
154    // partial-index entailment: every conjunct of the index predicate
155    // must appear as a conjunct of the query WHERE for the rewrite to
156    // be sound, otherwise the index would silently omit rows the
157    // query expects to see.
158    let query_conjuncts: Vec<SqlExpr> = params
159        .filters
160        .iter()
161        .flat_map(|f| match &f.expr {
162            FilterExpr::Expr(e) => {
163                let mut v = Vec::new();
164                flatten_and(e, &mut v);
165                v
166            }
167            _ => Vec::new(),
168        })
169        .collect();
170
171    for (i, f) in params.filters.iter().enumerate() {
172        let Some((field, value, residual)) = extract_equality_with_residual(&f.expr) else {
173            continue;
174        };
175        let canonical = canonical_index_field(&field);
176        let Some(idx) = params.indexes.iter().find(|i| {
177            i.state == IndexState::Ready
178                && i.field == canonical
179                && partial_index_entailed(i.predicate.as_deref(), &query_conjuncts)
180        }) else {
181            continue;
182        };
183
184        let mut remaining = params.filters.clone();
185        match residual {
186            Some(expr) => {
187                remaining[i] = Filter {
188                    expr: FilterExpr::Expr(expr),
189                };
190            }
191            None => {
192                remaining.remove(i);
193            }
194        }
195
196        let lookup_value = if idx.case_insensitive {
197            lowercase_string_value(&value)
198        } else {
199            value
200        };
201
202        return Some(SqlPlan::DocumentIndexLookup {
203            collection: params.collection.clone(),
204            alias: params.alias.clone(),
205            engine,
206            field: idx.field.clone(),
207            value: lookup_value,
208            filters: remaining,
209            projection: params.projection.clone(),
210            sort_keys: params.sort_keys.clone(),
211            limit: params.limit,
212            offset: params.offset,
213            distinct: params.distinct,
214            window_functions: params.window_functions.clone(),
215            case_insensitive: idx.case_insensitive,
216        });
217    }
218    None
219}
220
221/// Pull `(column_name, equality_value, residual_expr)` out of a filter
222/// expression if it contains a column-equals-literal predicate that can
223/// drive an index lookup. Returns `None` if no usable equality is
224/// present. The returned residual is the rest of the conjunction with
225/// the equality removed — `None` when the filter was a bare equality.
226fn extract_equality_with_residual(
227    expr: &FilterExpr,
228) -> Option<(String, SqlValue, Option<SqlExpr>)> {
229    match expr {
230        FilterExpr::Comparison {
231            field,
232            op: CompareOp::Eq,
233            value,
234        } => Some((field.clone(), value.clone(), None)),
235        FilterExpr::Expr(sql_expr) => {
236            let (col, lit, residual) = split_equality_from_expr(sql_expr)?;
237            Some((col, lit, residual))
238        }
239        _ => None,
240    }
241}
242
243/// Return `(column, literal, residual_expr)` for an equality found
244/// anywhere in a right-leaning AND conjunction tree, or `None` if no
245/// bare column-equals-literal predicate exists. The residual preserves
246/// every sibling conjunct in their original order; `None` means the
247/// expression was a bare equality with nothing left behind.
248fn split_equality_from_expr(expr: &SqlExpr) -> Option<(String, SqlValue, Option<SqlExpr>)> {
249    // Gather the conjuncts of a top-level AND chain left-to-right.
250    let mut conjuncts: Vec<SqlExpr> = Vec::new();
251    flatten_and(expr, &mut conjuncts);
252
253    // Find the first conjunct that is a bare column-equals-literal.
254    let eq_idx = conjuncts.iter().position(is_column_eq_literal)?;
255    let eq = conjuncts.remove(eq_idx);
256    let (col, lit) = match eq {
257        SqlExpr::BinaryOp { left, op, right } => match (*left, op, *right) {
258            (SqlExpr::Column { name, .. }, BinaryOp::Eq, SqlExpr::Literal(v)) => (name, v),
259            (SqlExpr::Literal(v), BinaryOp::Eq, SqlExpr::Column { name, .. }) => (name, v),
260            _ => return None,
261        },
262        _ => return None,
263    };
264
265    let residual = rebuild_and(conjuncts);
266    Some((col, lit, residual))
267}
268
269/// Append every leaf of a right-leaning `AND` tree to `out`. Non-AND
270/// expressions are a single leaf.
271fn flatten_and(expr: &SqlExpr, out: &mut Vec<SqlExpr>) {
272    match expr {
273        SqlExpr::BinaryOp {
274            left,
275            op: BinaryOp::And,
276            right,
277        } => {
278            flatten_and(left, out);
279            flatten_and(right, out);
280        }
281        other => out.push(other.clone()),
282    }
283}
284
285/// Reassemble a list of conjuncts into a right-leaning AND tree.
286/// Empty input returns `None`; single input returns itself.
287fn rebuild_and(mut conjuncts: Vec<SqlExpr>) -> Option<SqlExpr> {
288    let last = conjuncts.pop()?;
289    Some(
290        conjuncts
291            .into_iter()
292            .rfold(last, |acc, next| SqlExpr::BinaryOp {
293                left: Box::new(next),
294                op: BinaryOp::And,
295                right: Box::new(acc),
296            }),
297    )
298}
299
300/// Decide whether the query's WHERE conjuncts entail the partial-index
301/// predicate. `None` predicate means a full index — trivially entailed.
302/// Initial version uses conjunct-level structural equality: every
303/// conjunct of the index predicate must appear (by `PartialEq`) as a
304/// conjunct of the query. This is conservative and deliberately so —
305/// a false positive here would silently omit rows from query results.
306fn partial_index_entailed(predicate: Option<&str>, query_conjuncts: &[SqlExpr]) -> bool {
307    let Some(text) = predicate else {
308        return true;
309    };
310    let Ok(parsed) = crate::parse_expr_string(text) else {
311        // A catalog predicate we can't parse is not entailed — refuse
312        // to use the index rather than assume anything about its
313        // contents. The DDL path validates at CREATE INDEX time, so
314        // reaching this branch indicates drift.
315        return false;
316    };
317    let mut index_conjuncts: Vec<SqlExpr> = Vec::new();
318    flatten_and(&parsed, &mut index_conjuncts);
319    // Structural equality via the stable `Debug` representation:
320    // `SqlExpr` doesn't derive `PartialEq` (several nested variants
321    // carry types that can't derive it cheaply), but the Debug form is
322    // canonical for equivalent trees produced by the same parser. This
323    // is conservative — it matches only identical AST shapes and not,
324    // e.g., `a = 1` vs `1 = a`. Callers should write index predicates
325    // in the same normal form they use in query WHERE clauses, which
326    // is the convention everywhere else in the codebase.
327    index_conjuncts.iter().all(|ic| {
328        let ic_dbg = format!("{ic:?}");
329        query_conjuncts.iter().any(|qc| format!("{qc:?}") == ic_dbg)
330    })
331}
332
333fn is_column_eq_literal(expr: &SqlExpr) -> bool {
334    matches!(
335        expr,
336        SqlExpr::BinaryOp { left, op: BinaryOp::Eq, right }
337            if matches!(
338                (left.as_ref(), right.as_ref()),
339                (SqlExpr::Column { .. }, SqlExpr::Literal(_))
340                | (SqlExpr::Literal(_), SqlExpr::Column { .. }),
341            )
342    )
343}
344
345fn canonical_index_field(field: &str) -> String {
346    if field.starts_with("$.") || field.starts_with('$') {
347        field.to_string()
348    } else {
349        format!("$.{field}")
350    }
351}
352
353fn lowercase_string_value(v: &SqlValue) -> SqlValue {
354    if let SqlValue::String(s) = v {
355        SqlValue::String(s.to_lowercase())
356    } else {
357        v.clone()
358    }
359}
360
361/// Resolve the engine rules for a given engine type.
362///
363/// No catch-all — compiler enforces exhaustiveness.
364pub fn resolve_engine_rules(engine: EngineType) -> &'static dyn EngineRules {
365    match engine {
366        EngineType::DocumentSchemaless => &document_schemaless::SchemalessRules,
367        EngineType::DocumentStrict => &document_strict::StrictRules,
368        EngineType::KeyValue => &kv::KvRules,
369        EngineType::Columnar => &columnar::ColumnarRules,
370        EngineType::Timeseries => &timeseries::TimeseriesRules,
371        EngineType::Spatial => &spatial::SpatialRules,
372    }
373}