Skip to main content

nodedb_sql/engine_rules/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2
3pub mod array;
4pub mod columnar;
5pub mod document_schemaless;
6pub mod document_strict;
7pub mod kv;
8pub mod spatial;
9pub mod timeseries;
10
11use crate::error::Result;
12use crate::types::*;
13
14/// Parameters for planning an INSERT operation.
15pub struct InsertParams {
16    pub collection: String,
17    pub columns: Vec<String>,
18    pub rows: Vec<Vec<(String, SqlValue)>>,
19    pub column_defaults: Vec<(String, String)>,
20    /// `ON CONFLICT DO NOTHING` semantics: duplicate-PK rows are skipped
21    /// silently. `false` for plain `INSERT` (raises `unique_violation`).
22    pub if_absent: bool,
23}
24
25/// Parameters for planning a SCAN operation.
26pub struct ScanParams {
27    pub collection: String,
28    pub alias: Option<String>,
29    pub filters: Vec<Filter>,
30    pub projection: Vec<Projection>,
31    pub sort_keys: Vec<SortKey>,
32    pub limit: Option<usize>,
33    pub offset: usize,
34    pub distinct: bool,
35    pub window_functions: Vec<WindowSpec>,
36    /// Secondary indexes available on the scan's collection. Document
37    /// engines consult this to rewrite equality-on-indexed-field into
38    /// [`SqlPlan::DocumentIndexLookup`]. Other engines ignore it today.
39    pub indexes: Vec<IndexSpec>,
40    /// Bitemporal qualifier propagated from `plan_sql`. Engines without
41    /// bitemporal storage support reject a non-default scope via
42    /// `SqlError::Unsupported` — silently ignoring it would return
43    /// current-state data when the user asked for history.
44    pub temporal: crate::temporal::TemporalScope,
45    /// Whether this collection was created with bitemporal storage. When
46    /// `true`, engines that support bitemporal reads route the scan
47    /// through versioned storage; when `false`, a non-default
48    /// [`Self::temporal`] is rejected.
49    pub bitemporal: bool,
50}
51
52/// Parameters for planning a POINT GET operation.
53pub struct PointGetParams {
54    pub collection: String,
55    pub alias: Option<String>,
56    pub key_column: String,
57    pub key_value: SqlValue,
58}
59
60/// Parameters for planning an UPDATE operation.
61pub struct UpdateParams {
62    pub collection: String,
63    pub assignments: Vec<(String, SqlExpr)>,
64    pub filters: Vec<Filter>,
65    pub target_keys: Vec<SqlValue>,
66    pub returning: bool,
67}
68
69/// Parameters for planning an `UPDATE target SET ... FROM src WHERE ...` operation.
70pub struct UpdateFromParams {
71    pub collection: String,
72    /// The FROM source plan (Scan, Join, …).
73    pub source: Box<SqlPlan>,
74    /// Column in target used as the equi-join key.
75    pub target_join_col: String,
76    /// Column in source used as the equi-join key.
77    pub source_join_col: String,
78    /// SET assignments — RHS may reference source columns.
79    pub assignments: Vec<(String, SqlExpr)>,
80    /// Filters that apply only to the target.
81    pub target_filters: Vec<Filter>,
82    pub returning: bool,
83}
84
85/// Parameters for planning a DELETE operation.
86pub struct DeleteParams {
87    pub collection: String,
88    pub filters: Vec<Filter>,
89    pub target_keys: Vec<SqlValue>,
90}
91
92/// Parameters for planning a MERGE operation.
93pub struct MergeParams {
94    pub collection: String,
95    pub source: Box<SqlPlan>,
96    pub target_join_col: String,
97    pub source_join_col: String,
98    pub source_alias: String,
99    pub clauses: Vec<crate::types::MergePlanClause>,
100    pub returning: bool,
101}
102
103/// Parameters for planning an UPSERT operation.
104pub struct UpsertParams {
105    pub collection: String,
106    pub columns: Vec<String>,
107    pub rows: Vec<Vec<(String, SqlValue)>>,
108    pub column_defaults: Vec<(String, String)>,
109    /// `ON CONFLICT (...) DO UPDATE SET` assignments. Empty for plain
110    /// `UPSERT INTO ...`; populated when the caller is
111    /// `INSERT ... ON CONFLICT ... DO UPDATE SET`.
112    pub on_conflict_updates: Vec<(String, SqlExpr)>,
113}
114
115/// Parameters for planning an AGGREGATE operation.
116pub struct AggregateParams {
117    pub collection: String,
118    pub alias: Option<String>,
119    pub filters: Vec<Filter>,
120    pub group_by: Vec<SqlExpr>,
121    pub aggregates: Vec<AggregateExpr>,
122    pub having: Vec<Filter>,
123    pub limit: usize,
124    /// Timeseries-specific: bucket interval from time_bucket() call.
125    pub bucket_interval_ms: Option<i64>,
126    /// Timeseries-specific: non-time GROUP BY columns.
127    pub group_columns: Vec<String>,
128    /// Whether the collection has auto-tiering enabled.
129    pub has_auto_tier: bool,
130    /// Whether this collection was created with bitemporal storage.
131    /// When `true`, the base scan inside the aggregate is allowed to
132    /// carry a non-default temporal scope.
133    pub bitemporal: bool,
134    /// System-time / valid-time scope to propagate into the underlying
135    /// scan so bitemporal aggregate queries project an as-of snapshot
136    /// before grouping.
137    pub temporal: crate::temporal::TemporalScope,
138}
139
140/// Engine-specific planning rules.
141///
142/// Each engine type implements this trait to produce the correct `SqlPlan`
143/// variant for each operation, or return an error if the operation is not
144/// supported. This is the single source of truth for operation routing —
145/// no downstream code should ever check engine type to decide routing.
146pub trait EngineRules {
147    /// Plan an INSERT. Returns `Err` if the engine does not support inserts
148    /// (e.g. timeseries routes to `TimeseriesIngest` instead).
149    fn plan_insert(&self, params: InsertParams) -> Result<Vec<SqlPlan>>;
150    /// Plan an UPSERT (insert-or-merge). Returns `Err` for append-only or
151    /// columnar engines that don't support merge semantics.
152    fn plan_upsert(&self, params: UpsertParams) -> Result<Vec<SqlPlan>>;
153    /// Plan a table scan (SELECT without point-get optimization).
154    fn plan_scan(&self, params: ScanParams) -> Result<SqlPlan>;
155    /// Plan a point lookup by primary key. Returns `Err` for engines
156    /// that don't support O(1) key lookups (e.g. timeseries).
157    fn plan_point_get(&self, params: PointGetParams) -> Result<SqlPlan>;
158    /// Plan an UPDATE. Returns `Err` for append-only engines.
159    fn plan_update(&self, params: UpdateParams) -> Result<Vec<SqlPlan>>;
160    /// Plan an `UPDATE target SET ... FROM src WHERE ...`.
161    ///
162    /// Returns `Err(SqlError::Unsupported)` for engines that cannot participate
163    /// as an update target in a cross-table update (timeseries, kv-with-opaque-keys).
164    fn plan_update_from(&self, params: UpdateFromParams) -> Result<Vec<SqlPlan>>;
165    /// Plan a DELETE (point or bulk).
166    fn plan_delete(&self, params: DeleteParams) -> Result<Vec<SqlPlan>>;
167    /// Plan a GROUP BY / aggregate query.
168    fn plan_aggregate(&self, params: AggregateParams) -> Result<SqlPlan>;
169    /// Plan a MERGE statement.
170    ///
171    /// Returns `Err(SqlError::Unsupported)` for engines that do not support
172    /// MERGE semantics (everything except `document_schemaless` and
173    /// `document_strict`).
174    fn plan_merge(&self, params: MergeParams) -> Result<Vec<SqlPlan>>;
175}
176
177/// Attempt to rewrite `ScanParams` into a [`SqlPlan::DocumentIndexLookup`]
178/// when exactly one of the filters is an equality predicate on a `Ready`
179/// indexed field. Returns `None` to fall through to a generic `Scan`.
180///
181/// Shared by the schemaless and strict document engines so the
182/// index-rewrite rule has one source of truth. Normalizes strict column
183/// names to `$.column` before matching against index fields because the
184/// catalog stores every document index in JSON-path canonical form.
185pub(crate) fn try_document_index_lookup(
186    params: &ScanParams,
187    engine: EngineType,
188) -> Option<SqlPlan> {
189    // Sort / window functions still force a full scan because the
190    // IndexedFetch handler does not yet order rows or evaluate window
191    // aggregates. DISTINCT is safe on the index path: the handler
192    // emits each matched doc exactly once and any further dedup is a
193    // cheap Control-Plane pass on the scan-shaped response.
194    if !params.sort_keys.is_empty() || !params.window_functions.is_empty() {
195        return None;
196    }
197
198    // Iterate filters to find the first equality candidate that lines up
199    // with a Ready index. Keep the remaining filters as post-filters.
200    // Predicates appear in three shapes:
201    //   - `FilterExpr::Comparison { field, Eq, value }` — resolver path.
202    //   - `FilterExpr::Expr(BinaryOp { Column, Eq, Literal })` — generic.
203    //   - `FilterExpr::Expr(BinaryOp { left AND right })` — the
204    //     top-level AND the `convert_where_to_filters` path produces
205    //     for compound WHERE clauses like `a = 1 AND b > 2`. Descend
206    //     into AND conjuncts so an equality nested in a conjunction
207    //     still picks up the index, and the non-equality siblings
208    //     survive as a residual conjunction carried on the
209    //     IndexedFetch node (not as a wrapping Filter plan node —
210    //     that wrapping is the Control-Plane post-filter anti-pattern
211    //     the handler's `filters` payload exists to eliminate).
212    // Pre-collect the query's top-level conjuncts once. Used for
213    // partial-index entailment: every conjunct of the index predicate
214    // must appear as a conjunct of the query WHERE for the rewrite to
215    // be sound, otherwise the index would silently omit rows the
216    // query expects to see.
217    let query_conjuncts: Vec<SqlExpr> = params
218        .filters
219        .iter()
220        .flat_map(|f| match &f.expr {
221            FilterExpr::Expr(e) => {
222                let mut v = Vec::new();
223                flatten_and(e, &mut v);
224                v
225            }
226            _ => Vec::new(),
227        })
228        .collect();
229
230    for (i, f) in params.filters.iter().enumerate() {
231        let Some((field, value, residual)) = extract_equality_with_residual(&f.expr) else {
232            continue;
233        };
234        let canonical = canonical_index_field(&field);
235        let Some(idx) = params.indexes.iter().find(|i| {
236            i.state == IndexState::Ready
237                && i.field == canonical
238                && partial_index_entailed(i.predicate.as_deref(), &query_conjuncts)
239        }) else {
240            continue;
241        };
242
243        let mut remaining = params.filters.clone();
244        match residual {
245            Some(expr) => {
246                remaining[i] = Filter {
247                    expr: FilterExpr::Expr(expr),
248                };
249            }
250            None => {
251                remaining.remove(i);
252            }
253        }
254
255        let lookup_value = if idx.case_insensitive {
256            lowercase_string_value(&value)
257        } else {
258            value
259        };
260
261        return Some(SqlPlan::DocumentIndexLookup {
262            collection: params.collection.clone(),
263            alias: params.alias.clone(),
264            engine,
265            field: idx.field.clone(),
266            value: lookup_value,
267            filters: remaining,
268            projection: params.projection.clone(),
269            sort_keys: params.sort_keys.clone(),
270            limit: params.limit,
271            offset: params.offset,
272            distinct: params.distinct,
273            window_functions: params.window_functions.clone(),
274            case_insensitive: idx.case_insensitive,
275            temporal: params.temporal,
276        });
277    }
278    None
279}
280
281/// Pull `(column_name, equality_value, residual_expr)` out of a filter
282/// expression if it contains a column-equals-literal predicate that can
283/// drive an index lookup. Returns `None` if no usable equality is
284/// present. The returned residual is the rest of the conjunction with
285/// the equality removed — `None` when the filter was a bare equality.
286fn extract_equality_with_residual(
287    expr: &FilterExpr,
288) -> Option<(String, SqlValue, Option<SqlExpr>)> {
289    match expr {
290        FilterExpr::Comparison {
291            field,
292            op: CompareOp::Eq,
293            value,
294        } => Some((field.clone(), value.clone(), None)),
295        FilterExpr::Expr(sql_expr) => {
296            let (col, lit, residual) = split_equality_from_expr(sql_expr)?;
297            Some((col, lit, residual))
298        }
299        _ => None,
300    }
301}
302
303/// Return `(column, literal, residual_expr)` for an equality found
304/// anywhere in a right-leaning AND conjunction tree, or `None` if no
305/// bare column-equals-literal predicate exists. The residual preserves
306/// every sibling conjunct in their original order; `None` means the
307/// expression was a bare equality with nothing left behind.
308fn split_equality_from_expr(expr: &SqlExpr) -> Option<(String, SqlValue, Option<SqlExpr>)> {
309    // Gather the conjuncts of a top-level AND chain left-to-right.
310    let mut conjuncts: Vec<SqlExpr> = Vec::new();
311    flatten_and(expr, &mut conjuncts);
312
313    // Find the first conjunct that is a bare column-equals-literal.
314    let eq_idx = conjuncts.iter().position(is_column_eq_literal)?;
315    let eq = conjuncts.remove(eq_idx);
316    let (col, lit) = match eq {
317        SqlExpr::BinaryOp { left, op, right } => match (*left, op, *right) {
318            (SqlExpr::Column { name, .. }, BinaryOp::Eq, SqlExpr::Literal(v)) => (name, v),
319            (SqlExpr::Literal(v), BinaryOp::Eq, SqlExpr::Column { name, .. }) => (name, v),
320            _ => return None,
321        },
322        _ => return None,
323    };
324
325    let residual = rebuild_and(conjuncts);
326    Some((col, lit, residual))
327}
328
329/// Append every leaf of a right-leaning `AND` tree to `out`. Non-AND
330/// expressions are a single leaf.
331fn flatten_and(expr: &SqlExpr, out: &mut Vec<SqlExpr>) {
332    match expr {
333        SqlExpr::BinaryOp {
334            left,
335            op: BinaryOp::And,
336            right,
337        } => {
338            flatten_and(left, out);
339            flatten_and(right, out);
340        }
341        other => out.push(other.clone()),
342    }
343}
344
345/// Reassemble a list of conjuncts into a right-leaning AND tree.
346/// Empty input returns `None`; single input returns itself.
347fn rebuild_and(mut conjuncts: Vec<SqlExpr>) -> Option<SqlExpr> {
348    let last = conjuncts.pop()?;
349    Some(
350        conjuncts
351            .into_iter()
352            .rfold(last, |acc, next| SqlExpr::BinaryOp {
353                left: Box::new(next),
354                op: BinaryOp::And,
355                right: Box::new(acc),
356            }),
357    )
358}
359
360/// Decide whether the query's WHERE conjuncts entail the partial-index
361/// predicate. `None` predicate means a full index — trivially entailed.
362/// Initial version uses conjunct-level structural equality: every
363/// conjunct of the index predicate must appear (by `PartialEq`) as a
364/// conjunct of the query. This is conservative and deliberately so —
365/// a false positive here would silently omit rows from query results.
366fn partial_index_entailed(predicate: Option<&str>, query_conjuncts: &[SqlExpr]) -> bool {
367    let Some(text) = predicate else {
368        return true;
369    };
370    let Ok(parsed) = crate::parse_expr_string(text) else {
371        // A catalog predicate we can't parse is not entailed — refuse
372        // to use the index rather than assume anything about its
373        // contents. The DDL path validates at CREATE INDEX time, so
374        // reaching this branch indicates drift.
375        return false;
376    };
377    let mut index_conjuncts: Vec<SqlExpr> = Vec::new();
378    flatten_and(&parsed, &mut index_conjuncts);
379    // Structural equality via the stable `Debug` representation:
380    // `SqlExpr` doesn't derive `PartialEq` (several nested variants
381    // carry types that can't derive it cheaply), but the Debug form is
382    // canonical for equivalent trees produced by the same parser. This
383    // is conservative — it matches only identical AST shapes and not,
384    // e.g., `a = 1` vs `1 = a`. Callers should write index predicates
385    // in the same normal form they use in query WHERE clauses, which
386    // is the convention everywhere else in the codebase.
387    index_conjuncts.iter().all(|ic| {
388        let ic_dbg = format!("{ic:?}");
389        query_conjuncts.iter().any(|qc| format!("{qc:?}") == ic_dbg)
390    })
391}
392
393fn is_column_eq_literal(expr: &SqlExpr) -> bool {
394    matches!(
395        expr,
396        SqlExpr::BinaryOp { left, op: BinaryOp::Eq, right }
397            if matches!(
398                (left.as_ref(), right.as_ref()),
399                (SqlExpr::Column { .. }, SqlExpr::Literal(_))
400                | (SqlExpr::Literal(_), SqlExpr::Column { .. }),
401            )
402    )
403}
404
405fn canonical_index_field(field: &str) -> String {
406    if field.starts_with("$.") || field.starts_with('$') {
407        field.to_string()
408    } else {
409        format!("$.{field}")
410    }
411}
412
413fn lowercase_string_value(v: &SqlValue) -> SqlValue {
414    if let SqlValue::String(s) = v {
415        SqlValue::String(s.to_lowercase())
416    } else {
417        v.clone()
418    }
419}
420
421/// Resolve the engine rules for a given engine type.
422///
423/// No catch-all — compiler enforces exhaustiveness.
424pub fn resolve_engine_rules(engine: EngineType) -> &'static dyn EngineRules {
425    match engine {
426        EngineType::DocumentSchemaless => &document_schemaless::SchemalessRules,
427        EngineType::DocumentStrict => &document_strict::StrictRules,
428        EngineType::KeyValue => &kv::KvRules,
429        EngineType::Columnar => &columnar::ColumnarRules,
430        EngineType::Timeseries => &timeseries::TimeseriesRules,
431        EngineType::Spatial => &spatial::SpatialRules,
432        EngineType::Array => &array::ArrayRules,
433    }
434}