Skip to main content

nodedb_sql/engine_rules/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2
3pub mod array;
4pub mod columnar;
5pub mod document_schemaless;
6pub mod document_strict;
7pub mod kv;
8pub mod spatial;
9pub mod timeseries;
10
11use crate::error::Result;
12use crate::types::*;
13
14/// Parameters for planning an INSERT operation.
15pub struct InsertParams {
16    pub collection: String,
17    pub columns: Vec<String>,
18    pub rows: Vec<Vec<(String, SqlValue)>>,
19    pub column_defaults: Vec<(String, String)>,
20    /// `ON CONFLICT DO NOTHING` semantics: duplicate-PK rows are skipped
21    /// silently. `false` for plain `INSERT` (raises `unique_violation`).
22    pub if_absent: bool,
23    /// Raw column type strings from the catalog: `(column_name, type_str)`.
24    /// Used by columnar converters to reconstruct the exact `ColumnType` for
25    /// columns whose `SqlDataType` is ambiguous (e.g. both JSON and Bytes map
26    /// to `SqlDataType::Bytes`). Empty for engines that don't need it.
27    pub column_schema: Vec<(String, String)>,
28}
29
30/// Parameters for planning a SCAN operation.
31pub struct ScanParams {
32    pub collection: String,
33    pub alias: Option<String>,
34    pub filters: Vec<Filter>,
35    pub projection: Vec<Projection>,
36    pub sort_keys: Vec<SortKey>,
37    pub limit: Option<usize>,
38    pub offset: usize,
39    pub distinct: bool,
40    pub window_functions: Vec<WindowSpec>,
41    /// Secondary indexes available on the scan's collection. Document
42    /// engines consult this to rewrite equality-on-indexed-field into
43    /// [`SqlPlan::DocumentIndexLookup`]. Other engines ignore it today.
44    pub indexes: Vec<IndexSpec>,
45    /// Bitemporal qualifier propagated from `plan_sql`. Engines without
46    /// bitemporal storage support reject a non-default scope via
47    /// `SqlError::Unsupported` — silently ignoring it would return
48    /// current-state data when the user asked for history.
49    pub temporal: crate::temporal::TemporalScope,
50    /// Whether this collection was created with bitemporal storage. When
51    /// `true`, engines that support bitemporal reads route the scan
52    /// through versioned storage; when `false`, a non-default
53    /// [`Self::temporal`] is rejected.
54    pub bitemporal: bool,
55}
56
57/// Parameters for planning a POINT GET operation.
58pub struct PointGetParams {
59    pub collection: String,
60    pub alias: Option<String>,
61    pub key_column: String,
62    pub key_value: SqlValue,
63}
64
65/// Parameters for planning an UPDATE operation.
66pub struct UpdateParams {
67    pub collection: String,
68    pub assignments: Vec<(String, SqlExpr)>,
69    pub filters: Vec<Filter>,
70    pub target_keys: Vec<SqlValue>,
71    pub returning: bool,
72}
73
74/// Parameters for planning an `UPDATE target SET ... FROM src WHERE ...` operation.
75pub struct UpdateFromParams {
76    pub collection: String,
77    /// The FROM source plan (Scan, Join, …).
78    pub source: Box<SqlPlan>,
79    /// Column in target used as the equi-join key.
80    pub target_join_col: String,
81    /// Column in source used as the equi-join key.
82    pub source_join_col: String,
83    /// SET assignments — RHS may reference source columns.
84    pub assignments: Vec<(String, SqlExpr)>,
85    /// Filters that apply only to the target.
86    pub target_filters: Vec<Filter>,
87    pub returning: bool,
88}
89
90/// Parameters for planning a DELETE operation.
91pub struct DeleteParams {
92    pub collection: String,
93    pub filters: Vec<Filter>,
94    pub target_keys: Vec<SqlValue>,
95}
96
97/// Parameters for planning a MERGE operation.
98pub struct MergeParams {
99    pub collection: String,
100    pub source: Box<SqlPlan>,
101    pub target_join_col: String,
102    pub source_join_col: String,
103    pub source_alias: String,
104    pub clauses: Vec<crate::types::MergePlanClause>,
105    pub returning: bool,
106}
107
108/// Parameters for planning an UPSERT operation.
109pub struct UpsertParams {
110    pub collection: String,
111    pub columns: Vec<String>,
112    pub rows: Vec<Vec<(String, SqlValue)>>,
113    pub column_defaults: Vec<(String, String)>,
114    /// `ON CONFLICT (...) DO UPDATE SET` assignments. Empty for plain
115    /// `UPSERT INTO ...`; populated when the caller is
116    /// `INSERT ... ON CONFLICT ... DO UPDATE SET`.
117    pub on_conflict_updates: Vec<(String, SqlExpr)>,
118    /// Raw column type strings from the catalog: `(column_name, type_str)`.
119    /// Mirrors `InsertParams::column_schema` — see that field for rationale.
120    pub column_schema: Vec<(String, String)>,
121}
122
123/// Parameters for planning an AGGREGATE operation.
124pub struct AggregateParams {
125    pub collection: String,
126    pub alias: Option<String>,
127    pub filters: Vec<Filter>,
128    pub group_by: Vec<SqlExpr>,
129    pub aggregates: Vec<AggregateExpr>,
130    pub having: Vec<Filter>,
131    pub limit: usize,
132    /// Timeseries-specific: bucket interval from time_bucket() call.
133    pub bucket_interval_ms: Option<i64>,
134    /// Timeseries-specific: non-time GROUP BY columns.
135    pub group_columns: Vec<String>,
136    /// Whether the collection has auto-tiering enabled.
137    pub has_auto_tier: bool,
138    /// Whether this collection was created with bitemporal storage.
139    /// When `true`, the base scan inside the aggregate is allowed to
140    /// carry a non-default temporal scope.
141    pub bitemporal: bool,
142    /// System-time / valid-time scope to propagate into the underlying
143    /// scan so bitemporal aggregate queries project an as-of snapshot
144    /// before grouping.
145    pub temporal: crate::temporal::TemporalScope,
146}
147
148/// Engine-specific planning rules.
149///
150/// Each engine type implements this trait to produce the correct `SqlPlan`
151/// variant for each operation, or return an error if the operation is not
152/// supported. This is the single source of truth for operation routing —
153/// no downstream code should ever check engine type to decide routing.
154pub trait EngineRules {
155    /// Plan an INSERT. Returns `Err` if the engine does not support inserts
156    /// (e.g. timeseries routes to `TimeseriesIngest` instead).
157    fn plan_insert(&self, params: InsertParams) -> Result<Vec<SqlPlan>>;
158    /// Plan an UPSERT (insert-or-merge). Returns `Err` for append-only or
159    /// columnar engines that don't support merge semantics.
160    fn plan_upsert(&self, params: UpsertParams) -> Result<Vec<SqlPlan>>;
161    /// Plan a table scan (SELECT without point-get optimization).
162    fn plan_scan(&self, params: ScanParams) -> Result<SqlPlan>;
163    /// Plan a point lookup by primary key. Returns `Err` for engines
164    /// that don't support O(1) key lookups (e.g. timeseries).
165    fn plan_point_get(&self, params: PointGetParams) -> Result<SqlPlan>;
166    /// Plan an UPDATE. Returns `Err` for append-only engines.
167    fn plan_update(&self, params: UpdateParams) -> Result<Vec<SqlPlan>>;
168    /// Plan an `UPDATE target SET ... FROM src WHERE ...`.
169    ///
170    /// Returns `Err(SqlError::Unsupported)` for engines that cannot participate
171    /// as an update target in a cross-table update (timeseries, kv-with-opaque-keys).
172    fn plan_update_from(&self, params: UpdateFromParams) -> Result<Vec<SqlPlan>>;
173    /// Plan a DELETE (point or bulk).
174    fn plan_delete(&self, params: DeleteParams) -> Result<Vec<SqlPlan>>;
175    /// Plan a GROUP BY / aggregate query.
176    fn plan_aggregate(&self, params: AggregateParams) -> Result<SqlPlan>;
177    /// Plan a MERGE statement.
178    ///
179    /// Returns `Err(SqlError::Unsupported)` for engines that do not support
180    /// MERGE semantics (everything except `document_schemaless` and
181    /// `document_strict`).
182    fn plan_merge(&self, params: MergeParams) -> Result<Vec<SqlPlan>>;
183}
184
185/// Attempt to rewrite `ScanParams` into a [`SqlPlan::DocumentIndexLookup`]
186/// when exactly one of the filters is an equality predicate on a `Ready`
187/// indexed field. Returns `None` to fall through to a generic `Scan`.
188///
189/// Shared by the schemaless and strict document engines so the
190/// index-rewrite rule has one source of truth. Normalizes strict column
191/// names to `$.column` before matching against index fields because the
192/// catalog stores every document index in JSON-path canonical form.
193pub(crate) fn try_document_index_lookup(
194    params: &ScanParams,
195    engine: EngineType,
196) -> Option<SqlPlan> {
197    // Sort / window functions still force a full scan because the
198    // IndexedFetch handler does not yet order rows or evaluate window
199    // aggregates. DISTINCT is safe on the index path: the handler
200    // emits each matched doc exactly once and any further dedup is a
201    // cheap Control-Plane pass on the scan-shaped response.
202    if !params.sort_keys.is_empty() || !params.window_functions.is_empty() {
203        return None;
204    }
205
206    // Iterate filters to find the first equality candidate that lines up
207    // with a Ready index. Keep the remaining filters as post-filters.
208    // Predicates appear in three shapes:
209    //   - `FilterExpr::Comparison { field, Eq, value }` — resolver path.
210    //   - `FilterExpr::Expr(BinaryOp { Column, Eq, Literal })` — generic.
211    //   - `FilterExpr::Expr(BinaryOp { left AND right })` — the
212    //     top-level AND the `convert_where_to_filters` path produces
213    //     for compound WHERE clauses like `a = 1 AND b > 2`. Descend
214    //     into AND conjuncts so an equality nested in a conjunction
215    //     still picks up the index, and the non-equality siblings
216    //     survive as a residual conjunction carried on the
217    //     IndexedFetch node (not as a wrapping Filter plan node —
218    //     that wrapping is the Control-Plane post-filter anti-pattern
219    //     the handler's `filters` payload exists to eliminate).
220    // Pre-collect the query's top-level conjuncts once. Used for
221    // partial-index entailment: every conjunct of the index predicate
222    // must appear as a conjunct of the query WHERE for the rewrite to
223    // be sound, otherwise the index would silently omit rows the
224    // query expects to see.
225    let query_conjuncts: Vec<SqlExpr> = params
226        .filters
227        .iter()
228        .flat_map(|f| match &f.expr {
229            FilterExpr::Expr(e) => {
230                let mut v = Vec::new();
231                flatten_and(e, &mut v);
232                v
233            }
234            _ => Vec::new(),
235        })
236        .collect();
237
238    for (i, f) in params.filters.iter().enumerate() {
239        let Some((field, value, residual)) = extract_equality_with_residual(&f.expr) else {
240            continue;
241        };
242        let canonical = canonical_index_field(&field);
243        let Some(idx) = params.indexes.iter().find(|i| {
244            i.state == IndexState::Ready
245                && i.field == canonical
246                && partial_index_entailed(i.predicate.as_deref(), &query_conjuncts)
247        }) else {
248            continue;
249        };
250
251        let mut remaining = params.filters.clone();
252        match residual {
253            Some(expr) => {
254                remaining[i] = Filter {
255                    expr: FilterExpr::Expr(expr),
256                };
257            }
258            None => {
259                remaining.remove(i);
260            }
261        }
262
263        let lookup_value = if idx.case_insensitive {
264            lowercase_string_value(&value)
265        } else {
266            value
267        };
268
269        return Some(SqlPlan::DocumentIndexLookup {
270            collection: params.collection.clone(),
271            alias: params.alias.clone(),
272            engine,
273            field: idx.field.clone(),
274            value: lookup_value,
275            filters: remaining,
276            projection: params.projection.clone(),
277            sort_keys: params.sort_keys.clone(),
278            limit: params.limit,
279            offset: params.offset,
280            distinct: params.distinct,
281            window_functions: params.window_functions.clone(),
282            case_insensitive: idx.case_insensitive,
283            temporal: params.temporal,
284        });
285    }
286    None
287}
288
289/// Pull `(column_name, equality_value, residual_expr)` out of a filter
290/// expression if it contains a column-equals-literal predicate that can
291/// drive an index lookup. Returns `None` if no usable equality is
292/// present. The returned residual is the rest of the conjunction with
293/// the equality removed — `None` when the filter was a bare equality.
294fn extract_equality_with_residual(
295    expr: &FilterExpr,
296) -> Option<(String, SqlValue, Option<SqlExpr>)> {
297    match expr {
298        FilterExpr::Comparison {
299            field,
300            op: CompareOp::Eq,
301            value,
302        } => Some((field.clone(), value.clone(), None)),
303        FilterExpr::Expr(sql_expr) => {
304            let (col, lit, residual) = split_equality_from_expr(sql_expr)?;
305            Some((col, lit, residual))
306        }
307        _ => None,
308    }
309}
310
311/// Return `(column, literal, residual_expr)` for an equality found
312/// anywhere in a right-leaning AND conjunction tree, or `None` if no
313/// bare column-equals-literal predicate exists. The residual preserves
314/// every sibling conjunct in their original order; `None` means the
315/// expression was a bare equality with nothing left behind.
316fn split_equality_from_expr(expr: &SqlExpr) -> Option<(String, SqlValue, Option<SqlExpr>)> {
317    // Gather the conjuncts of a top-level AND chain left-to-right.
318    let mut conjuncts: Vec<SqlExpr> = Vec::new();
319    flatten_and(expr, &mut conjuncts);
320
321    // Find the first conjunct that is a bare column-equals-literal.
322    let eq_idx = conjuncts.iter().position(is_column_eq_literal)?;
323    let eq = conjuncts.remove(eq_idx);
324    let (col, lit) = match eq {
325        SqlExpr::BinaryOp { left, op, right } => match (*left, op, *right) {
326            (SqlExpr::Column { name, .. }, BinaryOp::Eq, SqlExpr::Literal(v)) => (name, v),
327            (SqlExpr::Literal(v), BinaryOp::Eq, SqlExpr::Column { name, .. }) => (name, v),
328            _ => return None,
329        },
330        _ => return None,
331    };
332
333    let residual = rebuild_and(conjuncts);
334    Some((col, lit, residual))
335}
336
337/// Append every leaf of a right-leaning `AND` tree to `out`. Non-AND
338/// expressions are a single leaf.
339fn flatten_and(expr: &SqlExpr, out: &mut Vec<SqlExpr>) {
340    match expr {
341        SqlExpr::BinaryOp {
342            left,
343            op: BinaryOp::And,
344            right,
345        } => {
346            flatten_and(left, out);
347            flatten_and(right, out);
348        }
349        other => out.push(other.clone()),
350    }
351}
352
353/// Reassemble a list of conjuncts into a right-leaning AND tree.
354/// Empty input returns `None`; single input returns itself.
355fn rebuild_and(mut conjuncts: Vec<SqlExpr>) -> Option<SqlExpr> {
356    let last = conjuncts.pop()?;
357    Some(
358        conjuncts
359            .into_iter()
360            .rfold(last, |acc, next| SqlExpr::BinaryOp {
361                left: Box::new(next),
362                op: BinaryOp::And,
363                right: Box::new(acc),
364            }),
365    )
366}
367
368/// Decide whether the query's WHERE conjuncts entail the partial-index
369/// predicate. `None` predicate means a full index — trivially entailed.
370/// Initial version uses conjunct-level structural equality: every
371/// conjunct of the index predicate must appear (by `PartialEq`) as a
372/// conjunct of the query. This is conservative and deliberately so —
373/// a false positive here would silently omit rows from query results.
374fn partial_index_entailed(predicate: Option<&str>, query_conjuncts: &[SqlExpr]) -> bool {
375    let Some(text) = predicate else {
376        return true;
377    };
378    let Ok(parsed) = crate::parse_expr_string(text) else {
379        // A catalog predicate we can't parse is not entailed — refuse
380        // to use the index rather than assume anything about its
381        // contents. The DDL path validates at CREATE INDEX time, so
382        // reaching this branch indicates drift.
383        return false;
384    };
385    let mut index_conjuncts: Vec<SqlExpr> = Vec::new();
386    flatten_and(&parsed, &mut index_conjuncts);
387    // Structural equality via the stable `Debug` representation:
388    // `SqlExpr` doesn't derive `PartialEq` (several nested variants
389    // carry types that can't derive it cheaply), but the Debug form is
390    // canonical for equivalent trees produced by the same parser. This
391    // is conservative — it matches only identical AST shapes and not,
392    // e.g., `a = 1` vs `1 = a`. Callers should write index predicates
393    // in the same normal form they use in query WHERE clauses, which
394    // is the convention everywhere else in the codebase.
395    index_conjuncts.iter().all(|ic| {
396        let ic_dbg = format!("{ic:?}");
397        query_conjuncts.iter().any(|qc| format!("{qc:?}") == ic_dbg)
398    })
399}
400
401fn is_column_eq_literal(expr: &SqlExpr) -> bool {
402    matches!(
403        expr,
404        SqlExpr::BinaryOp { left, op: BinaryOp::Eq, right }
405            if matches!(
406                (left.as_ref(), right.as_ref()),
407                (SqlExpr::Column { .. }, SqlExpr::Literal(_))
408                | (SqlExpr::Literal(_), SqlExpr::Column { .. }),
409            )
410    )
411}
412
413fn canonical_index_field(field: &str) -> String {
414    if field.starts_with("$.") || field.starts_with('$') {
415        field.to_string()
416    } else {
417        format!("$.{field}")
418    }
419}
420
421fn lowercase_string_value(v: &SqlValue) -> SqlValue {
422    if let SqlValue::String(s) = v {
423        SqlValue::String(s.to_lowercase())
424    } else {
425        v.clone()
426    }
427}
428
429/// Resolve the engine rules for a given engine type.
430///
431/// No catch-all — compiler enforces exhaustiveness.
432pub fn resolve_engine_rules(engine: EngineType) -> &'static dyn EngineRules {
433    match engine {
434        EngineType::DocumentSchemaless => &document_schemaless::SchemalessRules,
435        EngineType::DocumentStrict => &document_strict::StrictRules,
436        EngineType::KeyValue => &kv::KvRules,
437        EngineType::Columnar => &columnar::ColumnarRules,
438        EngineType::Timeseries => &timeseries::TimeseriesRules,
439        EngineType::Spatial => &spatial::SpatialRules,
440        EngineType::Array => &array::ArrayRules,
441    }
442}