nodedb_sql/engine_rules/mod.rs
1pub mod columnar;
2pub mod document_schemaless;
3pub mod document_strict;
4pub mod kv;
5pub mod spatial;
6pub mod timeseries;
7
8use crate::error::Result;
9use crate::types::*;
10
11/// Parameters for planning an INSERT operation.
12pub struct InsertParams {
13 pub collection: String,
14 pub columns: Vec<String>,
15 pub rows: Vec<Vec<(String, SqlValue)>>,
16 pub column_defaults: Vec<(String, String)>,
17 /// `ON CONFLICT DO NOTHING` semantics: duplicate-PK rows are skipped
18 /// silently. `false` for plain `INSERT` (raises `unique_violation`).
19 pub if_absent: bool,
20}
21
22/// Parameters for planning a SCAN operation.
23pub struct ScanParams {
24 pub collection: String,
25 pub alias: Option<String>,
26 pub filters: Vec<Filter>,
27 pub projection: Vec<Projection>,
28 pub sort_keys: Vec<SortKey>,
29 pub limit: Option<usize>,
30 pub offset: usize,
31 pub distinct: bool,
32 pub window_functions: Vec<WindowSpec>,
33 /// Secondary indexes available on the scan's collection. Document
34 /// engines consult this to rewrite equality-on-indexed-field into
35 /// [`SqlPlan::DocumentIndexLookup`]. Other engines ignore it today.
36 pub indexes: Vec<IndexSpec>,
37}
38
39/// Parameters for planning a POINT GET operation.
40pub struct PointGetParams {
41 pub collection: String,
42 pub alias: Option<String>,
43 pub key_column: String,
44 pub key_value: SqlValue,
45}
46
47/// Parameters for planning an UPDATE operation.
48pub struct UpdateParams {
49 pub collection: String,
50 pub assignments: Vec<(String, SqlExpr)>,
51 pub filters: Vec<Filter>,
52 pub target_keys: Vec<SqlValue>,
53 pub returning: bool,
54}
55
56/// Parameters for planning a DELETE operation.
57pub struct DeleteParams {
58 pub collection: String,
59 pub filters: Vec<Filter>,
60 pub target_keys: Vec<SqlValue>,
61}
62
63/// Parameters for planning an UPSERT operation.
64pub struct UpsertParams {
65 pub collection: String,
66 pub columns: Vec<String>,
67 pub rows: Vec<Vec<(String, SqlValue)>>,
68 pub column_defaults: Vec<(String, String)>,
69 /// `ON CONFLICT (...) DO UPDATE SET` assignments. Empty for plain
70 /// `UPSERT INTO ...`; populated when the caller is
71 /// `INSERT ... ON CONFLICT ... DO UPDATE SET`.
72 pub on_conflict_updates: Vec<(String, SqlExpr)>,
73}
74
75/// Parameters for planning an AGGREGATE operation.
76pub struct AggregateParams {
77 pub collection: String,
78 pub alias: Option<String>,
79 pub filters: Vec<Filter>,
80 pub group_by: Vec<SqlExpr>,
81 pub aggregates: Vec<AggregateExpr>,
82 pub having: Vec<Filter>,
83 pub limit: usize,
84 /// Timeseries-specific: bucket interval from time_bucket() call.
85 pub bucket_interval_ms: Option<i64>,
86 /// Timeseries-specific: non-time GROUP BY columns.
87 pub group_columns: Vec<String>,
88 /// Whether the collection has auto-tiering enabled.
89 pub has_auto_tier: bool,
90}
91
92/// Engine-specific planning rules.
93///
94/// Each engine type implements this trait to produce the correct `SqlPlan`
95/// variant for each operation, or return an error if the operation is not
96/// supported. This is the single source of truth for operation routing —
97/// no downstream code should ever check engine type to decide routing.
98pub trait EngineRules {
99 /// Plan an INSERT. Returns `Err` if the engine does not support inserts
100 /// (e.g. timeseries routes to `TimeseriesIngest` instead).
101 fn plan_insert(&self, params: InsertParams) -> Result<Vec<SqlPlan>>;
102 /// Plan an UPSERT (insert-or-merge). Returns `Err` for append-only or
103 /// columnar engines that don't support merge semantics.
104 fn plan_upsert(&self, params: UpsertParams) -> Result<Vec<SqlPlan>>;
105 /// Plan a table scan (SELECT without point-get optimization).
106 fn plan_scan(&self, params: ScanParams) -> Result<SqlPlan>;
107 /// Plan a point lookup by primary key. Returns `Err` for engines
108 /// that don't support O(1) key lookups (e.g. timeseries).
109 fn plan_point_get(&self, params: PointGetParams) -> Result<SqlPlan>;
110 /// Plan an UPDATE. Returns `Err` for append-only engines.
111 fn plan_update(&self, params: UpdateParams) -> Result<Vec<SqlPlan>>;
112 /// Plan a DELETE (point or bulk).
113 fn plan_delete(&self, params: DeleteParams) -> Result<Vec<SqlPlan>>;
114 /// Plan a GROUP BY / aggregate query.
115 fn plan_aggregate(&self, params: AggregateParams) -> Result<SqlPlan>;
116}
117
118/// Attempt to rewrite `ScanParams` into a [`SqlPlan::DocumentIndexLookup`]
119/// when exactly one of the filters is an equality predicate on a `Ready`
120/// indexed field. Returns `None` to fall through to a generic `Scan`.
121///
122/// Shared by the schemaless and strict document engines so the
123/// index-rewrite rule has one source of truth. Normalizes strict column
124/// names to `$.column` before matching against index fields because the
125/// catalog stores every document index in JSON-path canonical form.
126pub(crate) fn try_document_index_lookup(
127 params: &ScanParams,
128 engine: EngineType,
129) -> Option<SqlPlan> {
130 // Sort / window functions still force a full scan because the
131 // IndexedFetch handler does not yet order rows or evaluate window
132 // aggregates. DISTINCT is safe on the index path: the handler
133 // emits each matched doc exactly once and any further dedup is a
134 // cheap Control-Plane pass on the scan-shaped response.
135 if !params.sort_keys.is_empty() || !params.window_functions.is_empty() {
136 return None;
137 }
138
139 // Iterate filters to find the first equality candidate that lines up
140 // with a Ready index. Keep the remaining filters as post-filters.
141 // Predicates appear in three shapes:
142 // - `FilterExpr::Comparison { field, Eq, value }` — resolver path.
143 // - `FilterExpr::Expr(BinaryOp { Column, Eq, Literal })` — generic.
144 // - `FilterExpr::Expr(BinaryOp { left AND right })` — the
145 // top-level AND the `convert_where_to_filters` path produces
146 // for compound WHERE clauses like `a = 1 AND b > 2`. Descend
147 // into AND conjuncts so an equality nested in a conjunction
148 // still picks up the index, and the non-equality siblings
149 // survive as a residual conjunction carried on the
150 // IndexedFetch node (not as a wrapping Filter plan node —
151 // that wrapping is the Control-Plane post-filter anti-pattern
152 // the handler's `filters` payload exists to eliminate).
153 // Pre-collect the query's top-level conjuncts once. Used for
154 // partial-index entailment: every conjunct of the index predicate
155 // must appear as a conjunct of the query WHERE for the rewrite to
156 // be sound, otherwise the index would silently omit rows the
157 // query expects to see.
158 let query_conjuncts: Vec<SqlExpr> = params
159 .filters
160 .iter()
161 .flat_map(|f| match &f.expr {
162 FilterExpr::Expr(e) => {
163 let mut v = Vec::new();
164 flatten_and(e, &mut v);
165 v
166 }
167 _ => Vec::new(),
168 })
169 .collect();
170
171 for (i, f) in params.filters.iter().enumerate() {
172 let Some((field, value, residual)) = extract_equality_with_residual(&f.expr) else {
173 continue;
174 };
175 let canonical = canonical_index_field(&field);
176 let Some(idx) = params.indexes.iter().find(|i| {
177 i.state == IndexState::Ready
178 && i.field == canonical
179 && partial_index_entailed(i.predicate.as_deref(), &query_conjuncts)
180 }) else {
181 continue;
182 };
183
184 let mut remaining = params.filters.clone();
185 match residual {
186 Some(expr) => {
187 remaining[i] = Filter {
188 expr: FilterExpr::Expr(expr),
189 };
190 }
191 None => {
192 remaining.remove(i);
193 }
194 }
195
196 let lookup_value = if idx.case_insensitive {
197 lowercase_string_value(&value)
198 } else {
199 value
200 };
201
202 return Some(SqlPlan::DocumentIndexLookup {
203 collection: params.collection.clone(),
204 alias: params.alias.clone(),
205 engine,
206 field: idx.field.clone(),
207 value: lookup_value,
208 filters: remaining,
209 projection: params.projection.clone(),
210 sort_keys: params.sort_keys.clone(),
211 limit: params.limit,
212 offset: params.offset,
213 distinct: params.distinct,
214 window_functions: params.window_functions.clone(),
215 case_insensitive: idx.case_insensitive,
216 });
217 }
218 None
219}
220
221/// Pull `(column_name, equality_value, residual_expr)` out of a filter
222/// expression if it contains a column-equals-literal predicate that can
223/// drive an index lookup. Returns `None` if no usable equality is
224/// present. The returned residual is the rest of the conjunction with
225/// the equality removed — `None` when the filter was a bare equality.
226fn extract_equality_with_residual(
227 expr: &FilterExpr,
228) -> Option<(String, SqlValue, Option<SqlExpr>)> {
229 match expr {
230 FilterExpr::Comparison {
231 field,
232 op: CompareOp::Eq,
233 value,
234 } => Some((field.clone(), value.clone(), None)),
235 FilterExpr::Expr(sql_expr) => {
236 let (col, lit, residual) = split_equality_from_expr(sql_expr)?;
237 Some((col, lit, residual))
238 }
239 _ => None,
240 }
241}
242
243/// Return `(column, literal, residual_expr)` for an equality found
244/// anywhere in a right-leaning AND conjunction tree, or `None` if no
245/// bare column-equals-literal predicate exists. The residual preserves
246/// every sibling conjunct in their original order; `None` means the
247/// expression was a bare equality with nothing left behind.
248fn split_equality_from_expr(expr: &SqlExpr) -> Option<(String, SqlValue, Option<SqlExpr>)> {
249 // Gather the conjuncts of a top-level AND chain left-to-right.
250 let mut conjuncts: Vec<SqlExpr> = Vec::new();
251 flatten_and(expr, &mut conjuncts);
252
253 // Find the first conjunct that is a bare column-equals-literal.
254 let eq_idx = conjuncts.iter().position(is_column_eq_literal)?;
255 let eq = conjuncts.remove(eq_idx);
256 let (col, lit) = match eq {
257 SqlExpr::BinaryOp { left, op, right } => match (*left, op, *right) {
258 (SqlExpr::Column { name, .. }, BinaryOp::Eq, SqlExpr::Literal(v)) => (name, v),
259 (SqlExpr::Literal(v), BinaryOp::Eq, SqlExpr::Column { name, .. }) => (name, v),
260 _ => return None,
261 },
262 _ => return None,
263 };
264
265 let residual = rebuild_and(conjuncts);
266 Some((col, lit, residual))
267}
268
269/// Append every leaf of a right-leaning `AND` tree to `out`. Non-AND
270/// expressions are a single leaf.
271fn flatten_and(expr: &SqlExpr, out: &mut Vec<SqlExpr>) {
272 match expr {
273 SqlExpr::BinaryOp {
274 left,
275 op: BinaryOp::And,
276 right,
277 } => {
278 flatten_and(left, out);
279 flatten_and(right, out);
280 }
281 other => out.push(other.clone()),
282 }
283}
284
285/// Reassemble a list of conjuncts into a right-leaning AND tree.
286/// Empty input returns `None`; single input returns itself.
287fn rebuild_and(mut conjuncts: Vec<SqlExpr>) -> Option<SqlExpr> {
288 let last = conjuncts.pop()?;
289 Some(
290 conjuncts
291 .into_iter()
292 .rfold(last, |acc, next| SqlExpr::BinaryOp {
293 left: Box::new(next),
294 op: BinaryOp::And,
295 right: Box::new(acc),
296 }),
297 )
298}
299
300/// Decide whether the query's WHERE conjuncts entail the partial-index
301/// predicate. `None` predicate means a full index — trivially entailed.
302/// Initial version uses conjunct-level structural equality: every
303/// conjunct of the index predicate must appear (by `PartialEq`) as a
304/// conjunct of the query. This is conservative and deliberately so —
305/// a false positive here would silently omit rows from query results.
306fn partial_index_entailed(predicate: Option<&str>, query_conjuncts: &[SqlExpr]) -> bool {
307 let Some(text) = predicate else {
308 return true;
309 };
310 let Ok(parsed) = crate::parse_expr_string(text) else {
311 // A catalog predicate we can't parse is not entailed — refuse
312 // to use the index rather than assume anything about its
313 // contents. The DDL path validates at CREATE INDEX time, so
314 // reaching this branch indicates drift.
315 return false;
316 };
317 let mut index_conjuncts: Vec<SqlExpr> = Vec::new();
318 flatten_and(&parsed, &mut index_conjuncts);
319 // Structural equality via the stable `Debug` representation:
320 // `SqlExpr` doesn't derive `PartialEq` (several nested variants
321 // carry types that can't derive it cheaply), but the Debug form is
322 // canonical for equivalent trees produced by the same parser. This
323 // is conservative — it matches only identical AST shapes and not,
324 // e.g., `a = 1` vs `1 = a`. Callers should write index predicates
325 // in the same normal form they use in query WHERE clauses, which
326 // is the convention everywhere else in the codebase.
327 index_conjuncts.iter().all(|ic| {
328 let ic_dbg = format!("{ic:?}");
329 query_conjuncts.iter().any(|qc| format!("{qc:?}") == ic_dbg)
330 })
331}
332
333fn is_column_eq_literal(expr: &SqlExpr) -> bool {
334 matches!(
335 expr,
336 SqlExpr::BinaryOp { left, op: BinaryOp::Eq, right }
337 if matches!(
338 (left.as_ref(), right.as_ref()),
339 (SqlExpr::Column { .. }, SqlExpr::Literal(_))
340 | (SqlExpr::Literal(_), SqlExpr::Column { .. }),
341 )
342 )
343}
344
345fn canonical_index_field(field: &str) -> String {
346 if field.starts_with("$.") || field.starts_with('$') {
347 field.to_string()
348 } else {
349 format!("$.{field}")
350 }
351}
352
353fn lowercase_string_value(v: &SqlValue) -> SqlValue {
354 if let SqlValue::String(s) = v {
355 SqlValue::String(s.to_lowercase())
356 } else {
357 v.clone()
358 }
359}
360
361/// Resolve the engine rules for a given engine type.
362///
363/// No catch-all — compiler enforces exhaustiveness.
364pub fn resolve_engine_rules(engine: EngineType) -> &'static dyn EngineRules {
365 match engine {
366 EngineType::DocumentSchemaless => &document_schemaless::SchemalessRules,
367 EngineType::DocumentStrict => &document_strict::StrictRules,
368 EngineType::KeyValue => &kv::KvRules,
369 EngineType::Columnar => &columnar::ColumnarRules,
370 EngineType::Timeseries => ×eries::TimeseriesRules,
371 EngineType::Spatial => &spatial::SpatialRules,
372 }
373}