nodedb_sql/engine_rules/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2
3pub mod array;
4pub mod columnar;
5pub mod document_schemaless;
6pub mod document_strict;
7pub mod kv;
8pub mod spatial;
9pub mod timeseries;
10
11use crate::error::Result;
12use crate::types::*;
13
14/// Parameters for planning an INSERT operation.
15pub struct InsertParams {
16 pub collection: String,
17 pub columns: Vec<String>,
18 pub rows: Vec<Vec<(String, SqlValue)>>,
19 pub column_defaults: Vec<(String, String)>,
20 /// `ON CONFLICT DO NOTHING` semantics: duplicate-PK rows are skipped
21 /// silently. `false` for plain `INSERT` (raises `unique_violation`).
22 pub if_absent: bool,
23 /// Raw column type strings from the catalog: `(column_name, type_str)`.
24 /// Used by columnar converters to reconstruct the exact `ColumnType` for
25 /// columns whose `SqlDataType` is ambiguous (e.g. both JSON and Bytes map
26 /// to `SqlDataType::Bytes`). Empty for engines that don't need it.
27 pub column_schema: Vec<(String, String)>,
28}
29
30/// Parameters for planning a SCAN operation.
31pub struct ScanParams {
32 pub collection: String,
33 pub alias: Option<String>,
34 pub filters: Vec<Filter>,
35 pub projection: Vec<Projection>,
36 pub sort_keys: Vec<SortKey>,
37 pub limit: Option<usize>,
38 pub offset: usize,
39 pub distinct: bool,
40 pub window_functions: Vec<WindowSpec>,
41 /// Secondary indexes available on the scan's collection. Document
42 /// engines consult this to rewrite equality-on-indexed-field into
43 /// [`SqlPlan::DocumentIndexLookup`]. Other engines ignore it today.
44 pub indexes: Vec<IndexSpec>,
45 /// Bitemporal qualifier propagated from `plan_sql`. Engines without
46 /// bitemporal storage support reject a non-default scope via
47 /// `SqlError::Unsupported` — silently ignoring it would return
48 /// current-state data when the user asked for history.
49 pub temporal: crate::temporal::TemporalScope,
50 /// Whether this collection was created with bitemporal storage. When
51 /// `true`, engines that support bitemporal reads route the scan
52 /// through versioned storage; when `false`, a non-default
53 /// [`Self::temporal`] is rejected.
54 pub bitemporal: bool,
55}
56
57/// Parameters for planning a POINT GET operation.
58pub struct PointGetParams {
59 pub collection: String,
60 pub alias: Option<String>,
61 pub key_column: String,
62 pub key_value: SqlValue,
63}
64
65/// Parameters for planning an UPDATE operation.
66pub struct UpdateParams {
67 pub collection: String,
68 pub assignments: Vec<(String, SqlExpr)>,
69 pub filters: Vec<Filter>,
70 pub target_keys: Vec<SqlValue>,
71 pub returning: bool,
72}
73
74/// Parameters for planning an `UPDATE target SET ... FROM src WHERE ...` operation.
75pub struct UpdateFromParams {
76 pub collection: String,
77 /// The FROM source plan (Scan, Join, …).
78 pub source: Box<SqlPlan>,
79 /// Column in target used as the equi-join key.
80 pub target_join_col: String,
81 /// Column in source used as the equi-join key.
82 pub source_join_col: String,
83 /// SET assignments — RHS may reference source columns.
84 pub assignments: Vec<(String, SqlExpr)>,
85 /// Filters that apply only to the target.
86 pub target_filters: Vec<Filter>,
87 pub returning: bool,
88}
89
90/// Parameters for planning a DELETE operation.
91pub struct DeleteParams {
92 pub collection: String,
93 pub filters: Vec<Filter>,
94 pub target_keys: Vec<SqlValue>,
95}
96
97/// Parameters for planning a MERGE operation.
98pub struct MergeParams {
99 pub collection: String,
100 pub source: Box<SqlPlan>,
101 pub target_join_col: String,
102 pub source_join_col: String,
103 pub source_alias: String,
104 pub clauses: Vec<crate::types::MergePlanClause>,
105 pub returning: bool,
106}
107
108/// Parameters for planning an UPSERT operation.
109pub struct UpsertParams {
110 pub collection: String,
111 pub columns: Vec<String>,
112 pub rows: Vec<Vec<(String, SqlValue)>>,
113 pub column_defaults: Vec<(String, String)>,
114 /// `ON CONFLICT (...) DO UPDATE SET` assignments. Empty for plain
115 /// `UPSERT INTO ...`; populated when the caller is
116 /// `INSERT ... ON CONFLICT ... DO UPDATE SET`.
117 pub on_conflict_updates: Vec<(String, SqlExpr)>,
118 /// Raw column type strings from the catalog: `(column_name, type_str)`.
119 /// Mirrors `InsertParams::column_schema` — see that field for rationale.
120 pub column_schema: Vec<(String, String)>,
121}
122
123/// Parameters for planning an AGGREGATE operation.
124pub struct AggregateParams {
125 pub collection: String,
126 pub alias: Option<String>,
127 pub filters: Vec<Filter>,
128 pub group_by: Vec<SqlExpr>,
129 pub aggregates: Vec<AggregateExpr>,
130 pub having: Vec<Filter>,
131 pub limit: usize,
132 /// Timeseries-specific: bucket interval from time_bucket() call.
133 pub bucket_interval_ms: Option<i64>,
134 /// Timeseries-specific: non-time GROUP BY columns.
135 pub group_columns: Vec<String>,
136 /// Whether the collection has auto-tiering enabled.
137 pub has_auto_tier: bool,
138 /// Whether this collection was created with bitemporal storage.
139 /// When `true`, the base scan inside the aggregate is allowed to
140 /// carry a non-default temporal scope.
141 pub bitemporal: bool,
142 /// System-time / valid-time scope to propagate into the underlying
143 /// scan so bitemporal aggregate queries project an as-of snapshot
144 /// before grouping.
145 pub temporal: crate::temporal::TemporalScope,
146}
147
148/// Engine-specific planning rules.
149///
150/// Each engine type implements this trait to produce the correct `SqlPlan`
151/// variant for each operation, or return an error if the operation is not
152/// supported. This is the single source of truth for operation routing —
153/// no downstream code should ever check engine type to decide routing.
154pub trait EngineRules {
155 /// Plan an INSERT. Returns `Err` if the engine does not support inserts
156 /// (e.g. timeseries routes to `TimeseriesIngest` instead).
157 fn plan_insert(&self, params: InsertParams) -> Result<Vec<SqlPlan>>;
158 /// Plan an UPSERT (insert-or-merge). Returns `Err` for append-only or
159 /// columnar engines that don't support merge semantics.
160 fn plan_upsert(&self, params: UpsertParams) -> Result<Vec<SqlPlan>>;
161 /// Plan a table scan (SELECT without point-get optimization).
162 fn plan_scan(&self, params: ScanParams) -> Result<SqlPlan>;
163 /// Plan a point lookup by primary key. Returns `Err` for engines
164 /// that don't support O(1) key lookups (e.g. timeseries).
165 fn plan_point_get(&self, params: PointGetParams) -> Result<SqlPlan>;
166 /// Plan an UPDATE. Returns `Err` for append-only engines.
167 fn plan_update(&self, params: UpdateParams) -> Result<Vec<SqlPlan>>;
168 /// Plan an `UPDATE target SET ... FROM src WHERE ...`.
169 ///
170 /// Returns `Err(SqlError::Unsupported)` for engines that cannot participate
171 /// as an update target in a cross-table update (timeseries, kv-with-opaque-keys).
172 fn plan_update_from(&self, params: UpdateFromParams) -> Result<Vec<SqlPlan>>;
173 /// Plan a DELETE (point or bulk).
174 fn plan_delete(&self, params: DeleteParams) -> Result<Vec<SqlPlan>>;
175 /// Plan a GROUP BY / aggregate query.
176 fn plan_aggregate(&self, params: AggregateParams) -> Result<SqlPlan>;
177 /// Plan a MERGE statement.
178 ///
179 /// Returns `Err(SqlError::Unsupported)` for engines that do not support
180 /// MERGE semantics (everything except `document_schemaless` and
181 /// `document_strict`).
182 fn plan_merge(&self, params: MergeParams) -> Result<Vec<SqlPlan>>;
183}
184
185/// Attempt to rewrite `ScanParams` into a [`SqlPlan::DocumentIndexLookup`]
186/// when exactly one of the filters is an equality predicate on a `Ready`
187/// indexed field. Returns `None` to fall through to a generic `Scan`.
188///
189/// Shared by the schemaless and strict document engines so the
190/// index-rewrite rule has one source of truth. Normalizes strict column
191/// names to `$.column` before matching against index fields because the
192/// catalog stores every document index in JSON-path canonical form.
193pub(crate) fn try_document_index_lookup(
194 params: &ScanParams,
195 engine: EngineType,
196) -> Option<SqlPlan> {
197 // Sort / window functions still force a full scan because the
198 // IndexedFetch handler does not yet order rows or evaluate window
199 // aggregates. DISTINCT is safe on the index path: the handler
200 // emits each matched doc exactly once and any further dedup is a
201 // cheap Control-Plane pass on the scan-shaped response.
202 if !params.sort_keys.is_empty() || !params.window_functions.is_empty() {
203 return None;
204 }
205
206 // Iterate filters to find the first equality candidate that lines up
207 // with a Ready index. Keep the remaining filters as post-filters.
208 // Predicates appear in three shapes:
209 // - `FilterExpr::Comparison { field, Eq, value }` — resolver path.
210 // - `FilterExpr::Expr(BinaryOp { Column, Eq, Literal })` — generic.
211 // - `FilterExpr::Expr(BinaryOp { left AND right })` — the
212 // top-level AND the `convert_where_to_filters` path produces
213 // for compound WHERE clauses like `a = 1 AND b > 2`. Descend
214 // into AND conjuncts so an equality nested in a conjunction
215 // still picks up the index, and the non-equality siblings
216 // survive as a residual conjunction carried on the
217 // IndexedFetch node (not as a wrapping Filter plan node —
218 // that wrapping is the Control-Plane post-filter anti-pattern
219 // the handler's `filters` payload exists to eliminate).
220 // Pre-collect the query's top-level conjuncts once. Used for
221 // partial-index entailment: every conjunct of the index predicate
222 // must appear as a conjunct of the query WHERE for the rewrite to
223 // be sound, otherwise the index would silently omit rows the
224 // query expects to see.
225 let query_conjuncts: Vec<SqlExpr> = params
226 .filters
227 .iter()
228 .flat_map(|f| match &f.expr {
229 FilterExpr::Expr(e) => {
230 let mut v = Vec::new();
231 flatten_and(e, &mut v);
232 v
233 }
234 _ => Vec::new(),
235 })
236 .collect();
237
238 for (i, f) in params.filters.iter().enumerate() {
239 let Some((field, value, residual)) = extract_equality_with_residual(&f.expr) else {
240 continue;
241 };
242 let canonical = canonical_index_field(&field);
243 let Some(idx) = params.indexes.iter().find(|i| {
244 i.state == IndexState::Ready
245 && i.field == canonical
246 && partial_index_entailed(i.predicate.as_deref(), &query_conjuncts)
247 }) else {
248 continue;
249 };
250
251 let mut remaining = params.filters.clone();
252 match residual {
253 Some(expr) => {
254 remaining[i] = Filter {
255 expr: FilterExpr::Expr(expr),
256 };
257 }
258 None => {
259 remaining.remove(i);
260 }
261 }
262
263 let lookup_value = if idx.case_insensitive {
264 lowercase_string_value(&value)
265 } else {
266 value
267 };
268
269 return Some(SqlPlan::DocumentIndexLookup {
270 collection: params.collection.clone(),
271 alias: params.alias.clone(),
272 engine,
273 field: idx.field.clone(),
274 value: lookup_value,
275 filters: remaining,
276 projection: params.projection.clone(),
277 sort_keys: params.sort_keys.clone(),
278 limit: params.limit,
279 offset: params.offset,
280 distinct: params.distinct,
281 window_functions: params.window_functions.clone(),
282 case_insensitive: idx.case_insensitive,
283 temporal: params.temporal,
284 });
285 }
286 None
287}
288
289/// Pull `(column_name, equality_value, residual_expr)` out of a filter
290/// expression if it contains a column-equals-literal predicate that can
291/// drive an index lookup. Returns `None` if no usable equality is
292/// present. The returned residual is the rest of the conjunction with
293/// the equality removed — `None` when the filter was a bare equality.
294fn extract_equality_with_residual(
295 expr: &FilterExpr,
296) -> Option<(String, SqlValue, Option<SqlExpr>)> {
297 match expr {
298 FilterExpr::Comparison {
299 field,
300 op: CompareOp::Eq,
301 value,
302 } => Some((field.clone(), value.clone(), None)),
303 FilterExpr::Expr(sql_expr) => {
304 let (col, lit, residual) = split_equality_from_expr(sql_expr)?;
305 Some((col, lit, residual))
306 }
307 _ => None,
308 }
309}
310
311/// Return `(column, literal, residual_expr)` for an equality found
312/// anywhere in a right-leaning AND conjunction tree, or `None` if no
313/// bare column-equals-literal predicate exists. The residual preserves
314/// every sibling conjunct in their original order; `None` means the
315/// expression was a bare equality with nothing left behind.
316fn split_equality_from_expr(expr: &SqlExpr) -> Option<(String, SqlValue, Option<SqlExpr>)> {
317 // Gather the conjuncts of a top-level AND chain left-to-right.
318 let mut conjuncts: Vec<SqlExpr> = Vec::new();
319 flatten_and(expr, &mut conjuncts);
320
321 // Find the first conjunct that is a bare column-equals-literal.
322 let eq_idx = conjuncts.iter().position(is_column_eq_literal)?;
323 let eq = conjuncts.remove(eq_idx);
324 let (col, lit) = match eq {
325 SqlExpr::BinaryOp { left, op, right } => match (*left, op, *right) {
326 (SqlExpr::Column { name, .. }, BinaryOp::Eq, SqlExpr::Literal(v)) => (name, v),
327 (SqlExpr::Literal(v), BinaryOp::Eq, SqlExpr::Column { name, .. }) => (name, v),
328 _ => return None,
329 },
330 _ => return None,
331 };
332
333 let residual = rebuild_and(conjuncts);
334 Some((col, lit, residual))
335}
336
337/// Append every leaf of a right-leaning `AND` tree to `out`. Non-AND
338/// expressions are a single leaf.
339fn flatten_and(expr: &SqlExpr, out: &mut Vec<SqlExpr>) {
340 match expr {
341 SqlExpr::BinaryOp {
342 left,
343 op: BinaryOp::And,
344 right,
345 } => {
346 flatten_and(left, out);
347 flatten_and(right, out);
348 }
349 other => out.push(other.clone()),
350 }
351}
352
353/// Reassemble a list of conjuncts into a right-leaning AND tree.
354/// Empty input returns `None`; single input returns itself.
355fn rebuild_and(mut conjuncts: Vec<SqlExpr>) -> Option<SqlExpr> {
356 let last = conjuncts.pop()?;
357 Some(
358 conjuncts
359 .into_iter()
360 .rfold(last, |acc, next| SqlExpr::BinaryOp {
361 left: Box::new(next),
362 op: BinaryOp::And,
363 right: Box::new(acc),
364 }),
365 )
366}
367
368/// Decide whether the query's WHERE conjuncts entail the partial-index
369/// predicate. `None` predicate means a full index — trivially entailed.
370/// Initial version uses conjunct-level structural equality: every
371/// conjunct of the index predicate must appear (by `PartialEq`) as a
372/// conjunct of the query. This is conservative and deliberately so —
373/// a false positive here would silently omit rows from query results.
374fn partial_index_entailed(predicate: Option<&str>, query_conjuncts: &[SqlExpr]) -> bool {
375 let Some(text) = predicate else {
376 return true;
377 };
378 let Ok(parsed) = crate::parse_expr_string(text) else {
379 // A catalog predicate we can't parse is not entailed — refuse
380 // to use the index rather than assume anything about its
381 // contents. The DDL path validates at CREATE INDEX time, so
382 // reaching this branch indicates drift.
383 return false;
384 };
385 let mut index_conjuncts: Vec<SqlExpr> = Vec::new();
386 flatten_and(&parsed, &mut index_conjuncts);
387 // Structural equality via the stable `Debug` representation:
388 // `SqlExpr` doesn't derive `PartialEq` (several nested variants
389 // carry types that can't derive it cheaply), but the Debug form is
390 // canonical for equivalent trees produced by the same parser. This
391 // is conservative — it matches only identical AST shapes and not,
392 // e.g., `a = 1` vs `1 = a`. Callers should write index predicates
393 // in the same normal form they use in query WHERE clauses, which
394 // is the convention everywhere else in the codebase.
395 index_conjuncts.iter().all(|ic| {
396 let ic_dbg = format!("{ic:?}");
397 query_conjuncts.iter().any(|qc| format!("{qc:?}") == ic_dbg)
398 })
399}
400
401fn is_column_eq_literal(expr: &SqlExpr) -> bool {
402 matches!(
403 expr,
404 SqlExpr::BinaryOp { left, op: BinaryOp::Eq, right }
405 if matches!(
406 (left.as_ref(), right.as_ref()),
407 (SqlExpr::Column { .. }, SqlExpr::Literal(_))
408 | (SqlExpr::Literal(_), SqlExpr::Column { .. }),
409 )
410 )
411}
412
413fn canonical_index_field(field: &str) -> String {
414 if field.starts_with("$.") || field.starts_with('$') {
415 field.to_string()
416 } else {
417 format!("$.{field}")
418 }
419}
420
421fn lowercase_string_value(v: &SqlValue) -> SqlValue {
422 if let SqlValue::String(s) = v {
423 SqlValue::String(s.to_lowercase())
424 } else {
425 v.clone()
426 }
427}
428
429/// Resolve the engine rules for a given engine type.
430///
431/// No catch-all — compiler enforces exhaustiveness.
432pub fn resolve_engine_rules(engine: EngineType) -> &'static dyn EngineRules {
433 match engine {
434 EngineType::DocumentSchemaless => &document_schemaless::SchemalessRules,
435 EngineType::DocumentStrict => &document_strict::StrictRules,
436 EngineType::KeyValue => &kv::KvRules,
437 EngineType::Columnar => &columnar::ColumnarRules,
438 EngineType::Timeseries => ×eries::TimeseriesRules,
439 EngineType::Spatial => &spatial::SpatialRules,
440 EngineType::Array => &array::ArrayRules,
441 }
442}