nodedb_sql/engine_rules/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2
3pub mod array;
4pub mod columnar;
5pub mod document_schemaless;
6pub mod document_strict;
7pub mod kv;
8pub mod spatial;
9pub mod timeseries;
10
11use crate::error::Result;
12use crate::types::*;
13
14/// Parameters for planning an INSERT operation.
15pub struct InsertParams {
16 pub collection: String,
17 pub columns: Vec<String>,
18 pub rows: Vec<Vec<(String, SqlValue)>>,
19 pub column_defaults: Vec<(String, String)>,
20 /// `ON CONFLICT DO NOTHING` semantics: duplicate-PK rows are skipped
21 /// silently. `false` for plain `INSERT` (raises `unique_violation`).
22 pub if_absent: bool,
23}
24
25/// Parameters for planning a SCAN operation.
26pub struct ScanParams {
27 pub collection: String,
28 pub alias: Option<String>,
29 pub filters: Vec<Filter>,
30 pub projection: Vec<Projection>,
31 pub sort_keys: Vec<SortKey>,
32 pub limit: Option<usize>,
33 pub offset: usize,
34 pub distinct: bool,
35 pub window_functions: Vec<WindowSpec>,
36 /// Secondary indexes available on the scan's collection. Document
37 /// engines consult this to rewrite equality-on-indexed-field into
38 /// [`SqlPlan::DocumentIndexLookup`]. Other engines ignore it today.
39 pub indexes: Vec<IndexSpec>,
40 /// Bitemporal qualifier propagated from `plan_sql`. Engines without
41 /// bitemporal storage support reject a non-default scope via
42 /// `SqlError::Unsupported` — silently ignoring it would return
43 /// current-state data when the user asked for history.
44 pub temporal: crate::temporal::TemporalScope,
45 /// Whether this collection was created with bitemporal storage. When
46 /// `true`, engines that support bitemporal reads route the scan
47 /// through versioned storage; when `false`, a non-default
48 /// [`Self::temporal`] is rejected.
49 pub bitemporal: bool,
50}
51
52/// Parameters for planning a POINT GET operation.
53pub struct PointGetParams {
54 pub collection: String,
55 pub alias: Option<String>,
56 pub key_column: String,
57 pub key_value: SqlValue,
58}
59
60/// Parameters for planning an UPDATE operation.
61pub struct UpdateParams {
62 pub collection: String,
63 pub assignments: Vec<(String, SqlExpr)>,
64 pub filters: Vec<Filter>,
65 pub target_keys: Vec<SqlValue>,
66 pub returning: bool,
67}
68
69/// Parameters for planning an `UPDATE target SET ... FROM src WHERE ...` operation.
70pub struct UpdateFromParams {
71 pub collection: String,
72 /// The FROM source plan (Scan, Join, …).
73 pub source: Box<SqlPlan>,
74 /// Column in target used as the equi-join key.
75 pub target_join_col: String,
76 /// Column in source used as the equi-join key.
77 pub source_join_col: String,
78 /// SET assignments — RHS may reference source columns.
79 pub assignments: Vec<(String, SqlExpr)>,
80 /// Filters that apply only to the target.
81 pub target_filters: Vec<Filter>,
82 pub returning: bool,
83}
84
85/// Parameters for planning a DELETE operation.
86pub struct DeleteParams {
87 pub collection: String,
88 pub filters: Vec<Filter>,
89 pub target_keys: Vec<SqlValue>,
90}
91
92/// Parameters for planning a MERGE operation.
93pub struct MergeParams {
94 pub collection: String,
95 pub source: Box<SqlPlan>,
96 pub target_join_col: String,
97 pub source_join_col: String,
98 pub source_alias: String,
99 pub clauses: Vec<crate::types::MergePlanClause>,
100 pub returning: bool,
101}
102
103/// Parameters for planning an UPSERT operation.
104pub struct UpsertParams {
105 pub collection: String,
106 pub columns: Vec<String>,
107 pub rows: Vec<Vec<(String, SqlValue)>>,
108 pub column_defaults: Vec<(String, String)>,
109 /// `ON CONFLICT (...) DO UPDATE SET` assignments. Empty for plain
110 /// `UPSERT INTO ...`; populated when the caller is
111 /// `INSERT ... ON CONFLICT ... DO UPDATE SET`.
112 pub on_conflict_updates: Vec<(String, SqlExpr)>,
113}
114
115/// Parameters for planning an AGGREGATE operation.
116pub struct AggregateParams {
117 pub collection: String,
118 pub alias: Option<String>,
119 pub filters: Vec<Filter>,
120 pub group_by: Vec<SqlExpr>,
121 pub aggregates: Vec<AggregateExpr>,
122 pub having: Vec<Filter>,
123 pub limit: usize,
124 /// Timeseries-specific: bucket interval from time_bucket() call.
125 pub bucket_interval_ms: Option<i64>,
126 /// Timeseries-specific: non-time GROUP BY columns.
127 pub group_columns: Vec<String>,
128 /// Whether the collection has auto-tiering enabled.
129 pub has_auto_tier: bool,
130 /// Whether this collection was created with bitemporal storage.
131 /// When `true`, the base scan inside the aggregate is allowed to
132 /// carry a non-default temporal scope.
133 pub bitemporal: bool,
134 /// System-time / valid-time scope to propagate into the underlying
135 /// scan so bitemporal aggregate queries project an as-of snapshot
136 /// before grouping.
137 pub temporal: crate::temporal::TemporalScope,
138}
139
140/// Engine-specific planning rules.
141///
142/// Each engine type implements this trait to produce the correct `SqlPlan`
143/// variant for each operation, or return an error if the operation is not
144/// supported. This is the single source of truth for operation routing —
145/// no downstream code should ever check engine type to decide routing.
146pub trait EngineRules {
147 /// Plan an INSERT. Returns `Err` if the engine does not support inserts
148 /// (e.g. timeseries routes to `TimeseriesIngest` instead).
149 fn plan_insert(&self, params: InsertParams) -> Result<Vec<SqlPlan>>;
150 /// Plan an UPSERT (insert-or-merge). Returns `Err` for append-only or
151 /// columnar engines that don't support merge semantics.
152 fn plan_upsert(&self, params: UpsertParams) -> Result<Vec<SqlPlan>>;
153 /// Plan a table scan (SELECT without point-get optimization).
154 fn plan_scan(&self, params: ScanParams) -> Result<SqlPlan>;
155 /// Plan a point lookup by primary key. Returns `Err` for engines
156 /// that don't support O(1) key lookups (e.g. timeseries).
157 fn plan_point_get(&self, params: PointGetParams) -> Result<SqlPlan>;
158 /// Plan an UPDATE. Returns `Err` for append-only engines.
159 fn plan_update(&self, params: UpdateParams) -> Result<Vec<SqlPlan>>;
160 /// Plan an `UPDATE target SET ... FROM src WHERE ...`.
161 ///
162 /// Returns `Err(SqlError::Unsupported)` for engines that cannot participate
163 /// as an update target in a cross-table update (timeseries, kv-with-opaque-keys).
164 fn plan_update_from(&self, params: UpdateFromParams) -> Result<Vec<SqlPlan>>;
165 /// Plan a DELETE (point or bulk).
166 fn plan_delete(&self, params: DeleteParams) -> Result<Vec<SqlPlan>>;
167 /// Plan a GROUP BY / aggregate query.
168 fn plan_aggregate(&self, params: AggregateParams) -> Result<SqlPlan>;
169 /// Plan a MERGE statement.
170 ///
171 /// Returns `Err(SqlError::Unsupported)` for engines that do not support
172 /// MERGE semantics (everything except `document_schemaless` and
173 /// `document_strict`).
174 fn plan_merge(&self, params: MergeParams) -> Result<Vec<SqlPlan>>;
175}
176
177/// Attempt to rewrite `ScanParams` into a [`SqlPlan::DocumentIndexLookup`]
178/// when exactly one of the filters is an equality predicate on a `Ready`
179/// indexed field. Returns `None` to fall through to a generic `Scan`.
180///
181/// Shared by the schemaless and strict document engines so the
182/// index-rewrite rule has one source of truth. Normalizes strict column
183/// names to `$.column` before matching against index fields because the
184/// catalog stores every document index in JSON-path canonical form.
185pub(crate) fn try_document_index_lookup(
186 params: &ScanParams,
187 engine: EngineType,
188) -> Option<SqlPlan> {
189 // Sort / window functions still force a full scan because the
190 // IndexedFetch handler does not yet order rows or evaluate window
191 // aggregates. DISTINCT is safe on the index path: the handler
192 // emits each matched doc exactly once and any further dedup is a
193 // cheap Control-Plane pass on the scan-shaped response.
194 if !params.sort_keys.is_empty() || !params.window_functions.is_empty() {
195 return None;
196 }
197
198 // Iterate filters to find the first equality candidate that lines up
199 // with a Ready index. Keep the remaining filters as post-filters.
200 // Predicates appear in three shapes:
201 // - `FilterExpr::Comparison { field, Eq, value }` — resolver path.
202 // - `FilterExpr::Expr(BinaryOp { Column, Eq, Literal })` — generic.
203 // - `FilterExpr::Expr(BinaryOp { left AND right })` — the
204 // top-level AND the `convert_where_to_filters` path produces
205 // for compound WHERE clauses like `a = 1 AND b > 2`. Descend
206 // into AND conjuncts so an equality nested in a conjunction
207 // still picks up the index, and the non-equality siblings
208 // survive as a residual conjunction carried on the
209 // IndexedFetch node (not as a wrapping Filter plan node —
210 // that wrapping is the Control-Plane post-filter anti-pattern
211 // the handler's `filters` payload exists to eliminate).
212 // Pre-collect the query's top-level conjuncts once. Used for
213 // partial-index entailment: every conjunct of the index predicate
214 // must appear as a conjunct of the query WHERE for the rewrite to
215 // be sound, otherwise the index would silently omit rows the
216 // query expects to see.
217 let query_conjuncts: Vec<SqlExpr> = params
218 .filters
219 .iter()
220 .flat_map(|f| match &f.expr {
221 FilterExpr::Expr(e) => {
222 let mut v = Vec::new();
223 flatten_and(e, &mut v);
224 v
225 }
226 _ => Vec::new(),
227 })
228 .collect();
229
230 for (i, f) in params.filters.iter().enumerate() {
231 let Some((field, value, residual)) = extract_equality_with_residual(&f.expr) else {
232 continue;
233 };
234 let canonical = canonical_index_field(&field);
235 let Some(idx) = params.indexes.iter().find(|i| {
236 i.state == IndexState::Ready
237 && i.field == canonical
238 && partial_index_entailed(i.predicate.as_deref(), &query_conjuncts)
239 }) else {
240 continue;
241 };
242
243 let mut remaining = params.filters.clone();
244 match residual {
245 Some(expr) => {
246 remaining[i] = Filter {
247 expr: FilterExpr::Expr(expr),
248 };
249 }
250 None => {
251 remaining.remove(i);
252 }
253 }
254
255 let lookup_value = if idx.case_insensitive {
256 lowercase_string_value(&value)
257 } else {
258 value
259 };
260
261 return Some(SqlPlan::DocumentIndexLookup {
262 collection: params.collection.clone(),
263 alias: params.alias.clone(),
264 engine,
265 field: idx.field.clone(),
266 value: lookup_value,
267 filters: remaining,
268 projection: params.projection.clone(),
269 sort_keys: params.sort_keys.clone(),
270 limit: params.limit,
271 offset: params.offset,
272 distinct: params.distinct,
273 window_functions: params.window_functions.clone(),
274 case_insensitive: idx.case_insensitive,
275 temporal: params.temporal,
276 });
277 }
278 None
279}
280
281/// Pull `(column_name, equality_value, residual_expr)` out of a filter
282/// expression if it contains a column-equals-literal predicate that can
283/// drive an index lookup. Returns `None` if no usable equality is
284/// present. The returned residual is the rest of the conjunction with
285/// the equality removed — `None` when the filter was a bare equality.
286fn extract_equality_with_residual(
287 expr: &FilterExpr,
288) -> Option<(String, SqlValue, Option<SqlExpr>)> {
289 match expr {
290 FilterExpr::Comparison {
291 field,
292 op: CompareOp::Eq,
293 value,
294 } => Some((field.clone(), value.clone(), None)),
295 FilterExpr::Expr(sql_expr) => {
296 let (col, lit, residual) = split_equality_from_expr(sql_expr)?;
297 Some((col, lit, residual))
298 }
299 _ => None,
300 }
301}
302
303/// Return `(column, literal, residual_expr)` for an equality found
304/// anywhere in a right-leaning AND conjunction tree, or `None` if no
305/// bare column-equals-literal predicate exists. The residual preserves
306/// every sibling conjunct in their original order; `None` means the
307/// expression was a bare equality with nothing left behind.
308fn split_equality_from_expr(expr: &SqlExpr) -> Option<(String, SqlValue, Option<SqlExpr>)> {
309 // Gather the conjuncts of a top-level AND chain left-to-right.
310 let mut conjuncts: Vec<SqlExpr> = Vec::new();
311 flatten_and(expr, &mut conjuncts);
312
313 // Find the first conjunct that is a bare column-equals-literal.
314 let eq_idx = conjuncts.iter().position(is_column_eq_literal)?;
315 let eq = conjuncts.remove(eq_idx);
316 let (col, lit) = match eq {
317 SqlExpr::BinaryOp { left, op, right } => match (*left, op, *right) {
318 (SqlExpr::Column { name, .. }, BinaryOp::Eq, SqlExpr::Literal(v)) => (name, v),
319 (SqlExpr::Literal(v), BinaryOp::Eq, SqlExpr::Column { name, .. }) => (name, v),
320 _ => return None,
321 },
322 _ => return None,
323 };
324
325 let residual = rebuild_and(conjuncts);
326 Some((col, lit, residual))
327}
328
329/// Append every leaf of a right-leaning `AND` tree to `out`. Non-AND
330/// expressions are a single leaf.
331fn flatten_and(expr: &SqlExpr, out: &mut Vec<SqlExpr>) {
332 match expr {
333 SqlExpr::BinaryOp {
334 left,
335 op: BinaryOp::And,
336 right,
337 } => {
338 flatten_and(left, out);
339 flatten_and(right, out);
340 }
341 other => out.push(other.clone()),
342 }
343}
344
345/// Reassemble a list of conjuncts into a right-leaning AND tree.
346/// Empty input returns `None`; single input returns itself.
347fn rebuild_and(mut conjuncts: Vec<SqlExpr>) -> Option<SqlExpr> {
348 let last = conjuncts.pop()?;
349 Some(
350 conjuncts
351 .into_iter()
352 .rfold(last, |acc, next| SqlExpr::BinaryOp {
353 left: Box::new(next),
354 op: BinaryOp::And,
355 right: Box::new(acc),
356 }),
357 )
358}
359
360/// Decide whether the query's WHERE conjuncts entail the partial-index
361/// predicate. `None` predicate means a full index — trivially entailed.
362/// Initial version uses conjunct-level structural equality: every
363/// conjunct of the index predicate must appear (by `PartialEq`) as a
364/// conjunct of the query. This is conservative and deliberately so —
365/// a false positive here would silently omit rows from query results.
366fn partial_index_entailed(predicate: Option<&str>, query_conjuncts: &[SqlExpr]) -> bool {
367 let Some(text) = predicate else {
368 return true;
369 };
370 let Ok(parsed) = crate::parse_expr_string(text) else {
371 // A catalog predicate we can't parse is not entailed — refuse
372 // to use the index rather than assume anything about its
373 // contents. The DDL path validates at CREATE INDEX time, so
374 // reaching this branch indicates drift.
375 return false;
376 };
377 let mut index_conjuncts: Vec<SqlExpr> = Vec::new();
378 flatten_and(&parsed, &mut index_conjuncts);
379 // Structural equality via the stable `Debug` representation:
380 // `SqlExpr` doesn't derive `PartialEq` (several nested variants
381 // carry types that can't derive it cheaply), but the Debug form is
382 // canonical for equivalent trees produced by the same parser. This
383 // is conservative — it matches only identical AST shapes and not,
384 // e.g., `a = 1` vs `1 = a`. Callers should write index predicates
385 // in the same normal form they use in query WHERE clauses, which
386 // is the convention everywhere else in the codebase.
387 index_conjuncts.iter().all(|ic| {
388 let ic_dbg = format!("{ic:?}");
389 query_conjuncts.iter().any(|qc| format!("{qc:?}") == ic_dbg)
390 })
391}
392
393fn is_column_eq_literal(expr: &SqlExpr) -> bool {
394 matches!(
395 expr,
396 SqlExpr::BinaryOp { left, op: BinaryOp::Eq, right }
397 if matches!(
398 (left.as_ref(), right.as_ref()),
399 (SqlExpr::Column { .. }, SqlExpr::Literal(_))
400 | (SqlExpr::Literal(_), SqlExpr::Column { .. }),
401 )
402 )
403}
404
405fn canonical_index_field(field: &str) -> String {
406 if field.starts_with("$.") || field.starts_with('$') {
407 field.to_string()
408 } else {
409 format!("$.{field}")
410 }
411}
412
413fn lowercase_string_value(v: &SqlValue) -> SqlValue {
414 if let SqlValue::String(s) = v {
415 SqlValue::String(s.to_lowercase())
416 } else {
417 v.clone()
418 }
419}
420
421/// Resolve the engine rules for a given engine type.
422///
423/// No catch-all — compiler enforces exhaustiveness.
424pub fn resolve_engine_rules(engine: EngineType) -> &'static dyn EngineRules {
425 match engine {
426 EngineType::DocumentSchemaless => &document_schemaless::SchemalessRules,
427 EngineType::DocumentStrict => &document_strict::StrictRules,
428 EngineType::KeyValue => &kv::KvRules,
429 EngineType::Columnar => &columnar::ColumnarRules,
430 EngineType::Timeseries => ×eries::TimeseriesRules,
431 EngineType::Spatial => &spatial::SpatialRules,
432 EngineType::Array => &array::ArrayRules,
433 }
434}