nodedb_sql/engine_rules/mod.rs
1pub mod columnar;
2pub mod document_schemaless;
3pub mod document_strict;
4pub mod kv;
5pub mod spatial;
6pub mod timeseries;
7
8use crate::error::Result;
9use crate::types::*;
10
11/// Parameters for planning an INSERT operation.
12pub struct InsertParams {
13 pub collection: String,
14 pub columns: Vec<String>,
15 pub rows: Vec<Vec<(String, SqlValue)>>,
16 pub column_defaults: Vec<(String, String)>,
17}
18
19/// Parameters for planning a SCAN operation.
20pub struct ScanParams {
21 pub collection: String,
22 pub alias: Option<String>,
23 pub filters: Vec<Filter>,
24 pub projection: Vec<Projection>,
25 pub sort_keys: Vec<SortKey>,
26 pub limit: Option<usize>,
27 pub offset: usize,
28 pub distinct: bool,
29 pub window_functions: Vec<WindowSpec>,
30 /// Secondary indexes available on the scan's collection. Document
31 /// engines consult this to rewrite equality-on-indexed-field into
32 /// [`SqlPlan::DocumentIndexLookup`]. Other engines ignore it today.
33 pub indexes: Vec<IndexSpec>,
34}
35
36/// Parameters for planning a POINT GET operation.
37pub struct PointGetParams {
38 pub collection: String,
39 pub alias: Option<String>,
40 pub key_column: String,
41 pub key_value: SqlValue,
42}
43
44/// Parameters for planning an UPDATE operation.
45pub struct UpdateParams {
46 pub collection: String,
47 pub assignments: Vec<(String, SqlExpr)>,
48 pub filters: Vec<Filter>,
49 pub target_keys: Vec<SqlValue>,
50 pub returning: bool,
51}
52
53/// Parameters for planning a DELETE operation.
54pub struct DeleteParams {
55 pub collection: String,
56 pub filters: Vec<Filter>,
57 pub target_keys: Vec<SqlValue>,
58}
59
60/// Parameters for planning an UPSERT operation.
61pub struct UpsertParams {
62 pub collection: String,
63 pub columns: Vec<String>,
64 pub rows: Vec<Vec<(String, SqlValue)>>,
65 pub column_defaults: Vec<(String, String)>,
66 /// `ON CONFLICT (...) DO UPDATE SET` assignments. Empty for plain
67 /// `UPSERT INTO ...`; populated when the caller is
68 /// `INSERT ... ON CONFLICT ... DO UPDATE SET`.
69 pub on_conflict_updates: Vec<(String, SqlExpr)>,
70}
71
72/// Parameters for planning an AGGREGATE operation.
73pub struct AggregateParams {
74 pub collection: String,
75 pub alias: Option<String>,
76 pub filters: Vec<Filter>,
77 pub group_by: Vec<SqlExpr>,
78 pub aggregates: Vec<AggregateExpr>,
79 pub having: Vec<Filter>,
80 pub limit: usize,
81 /// Timeseries-specific: bucket interval from time_bucket() call.
82 pub bucket_interval_ms: Option<i64>,
83 /// Timeseries-specific: non-time GROUP BY columns.
84 pub group_columns: Vec<String>,
85 /// Whether the collection has auto-tiering enabled.
86 pub has_auto_tier: bool,
87}
88
89/// Engine-specific planning rules.
90///
91/// Each engine type implements this trait to produce the correct `SqlPlan`
92/// variant for each operation, or return an error if the operation is not
93/// supported. This is the single source of truth for operation routing —
94/// no downstream code should ever check engine type to decide routing.
95pub trait EngineRules {
96 /// Plan an INSERT. Returns `Err` if the engine does not support inserts
97 /// (e.g. timeseries routes to `TimeseriesIngest` instead).
98 fn plan_insert(&self, params: InsertParams) -> Result<Vec<SqlPlan>>;
99 /// Plan an UPSERT (insert-or-merge). Returns `Err` for append-only or
100 /// columnar engines that don't support merge semantics.
101 fn plan_upsert(&self, params: UpsertParams) -> Result<Vec<SqlPlan>>;
102 /// Plan a table scan (SELECT without point-get optimization).
103 fn plan_scan(&self, params: ScanParams) -> Result<SqlPlan>;
104 /// Plan a point lookup by primary key. Returns `Err` for engines
105 /// that don't support O(1) key lookups (e.g. timeseries).
106 fn plan_point_get(&self, params: PointGetParams) -> Result<SqlPlan>;
107 /// Plan an UPDATE. Returns `Err` for append-only engines.
108 fn plan_update(&self, params: UpdateParams) -> Result<Vec<SqlPlan>>;
109 /// Plan a DELETE (point or bulk).
110 fn plan_delete(&self, params: DeleteParams) -> Result<Vec<SqlPlan>>;
111 /// Plan a GROUP BY / aggregate query.
112 fn plan_aggregate(&self, params: AggregateParams) -> Result<SqlPlan>;
113}
114
115/// Attempt to rewrite `ScanParams` into a [`SqlPlan::DocumentIndexLookup`]
116/// when exactly one of the filters is an equality predicate on a `Ready`
117/// indexed field. Returns `None` to fall through to a generic `Scan`.
118///
119/// Shared by the schemaless and strict document engines so the
120/// index-rewrite rule has one source of truth. Normalizes strict column
121/// names to `$.column` before matching against index fields because the
122/// catalog stores every document index in JSON-path canonical form.
123pub(crate) fn try_document_index_lookup(
124 params: &ScanParams,
125 engine: EngineType,
126) -> Option<SqlPlan> {
127 // Sort / window functions still force a full scan because the
128 // IndexedFetch handler does not yet order rows or evaluate window
129 // aggregates. DISTINCT is safe on the index path: the handler
130 // emits each matched doc exactly once and any further dedup is a
131 // cheap Control-Plane pass on the scan-shaped response.
132 if !params.sort_keys.is_empty() || !params.window_functions.is_empty() {
133 return None;
134 }
135
136 // Iterate filters to find the first equality candidate that lines up
137 // with a Ready index. Keep the remaining filters as post-filters.
138 // Predicates appear in three shapes:
139 // - `FilterExpr::Comparison { field, Eq, value }` — resolver path.
140 // - `FilterExpr::Expr(BinaryOp { Column, Eq, Literal })` — generic.
141 // - `FilterExpr::Expr(BinaryOp { left AND right })` — the
142 // top-level AND the `convert_where_to_filters` path produces
143 // for compound WHERE clauses like `a = 1 AND b > 2`. Descend
144 // into AND conjuncts so an equality nested in a conjunction
145 // still picks up the index, and the non-equality siblings
146 // survive as a residual conjunction carried on the
147 // IndexedFetch node (not as a wrapping Filter plan node —
148 // that wrapping is the Control-Plane post-filter anti-pattern
149 // the handler's `filters` payload exists to eliminate).
150 // Pre-collect the query's top-level conjuncts once. Used for
151 // partial-index entailment: every conjunct of the index predicate
152 // must appear as a conjunct of the query WHERE for the rewrite to
153 // be sound, otherwise the index would silently omit rows the
154 // query expects to see.
155 let query_conjuncts: Vec<SqlExpr> = params
156 .filters
157 .iter()
158 .flat_map(|f| match &f.expr {
159 FilterExpr::Expr(e) => {
160 let mut v = Vec::new();
161 flatten_and(e, &mut v);
162 v
163 }
164 _ => Vec::new(),
165 })
166 .collect();
167
168 for (i, f) in params.filters.iter().enumerate() {
169 let Some((field, value, residual)) = extract_equality_with_residual(&f.expr) else {
170 continue;
171 };
172 let canonical = canonical_index_field(&field);
173 let Some(idx) = params.indexes.iter().find(|i| {
174 i.state == IndexState::Ready
175 && i.field == canonical
176 && partial_index_entailed(i.predicate.as_deref(), &query_conjuncts)
177 }) else {
178 continue;
179 };
180
181 let mut remaining = params.filters.clone();
182 match residual {
183 Some(expr) => {
184 remaining[i] = Filter {
185 expr: FilterExpr::Expr(expr),
186 };
187 }
188 None => {
189 remaining.remove(i);
190 }
191 }
192
193 let lookup_value = if idx.case_insensitive {
194 lowercase_string_value(&value)
195 } else {
196 value
197 };
198
199 return Some(SqlPlan::DocumentIndexLookup {
200 collection: params.collection.clone(),
201 alias: params.alias.clone(),
202 engine,
203 field: idx.field.clone(),
204 value: lookup_value,
205 filters: remaining,
206 projection: params.projection.clone(),
207 sort_keys: params.sort_keys.clone(),
208 limit: params.limit,
209 offset: params.offset,
210 distinct: params.distinct,
211 window_functions: params.window_functions.clone(),
212 case_insensitive: idx.case_insensitive,
213 });
214 }
215 None
216}
217
218/// Pull `(column_name, equality_value, residual_expr)` out of a filter
219/// expression if it contains a column-equals-literal predicate that can
220/// drive an index lookup. Returns `None` if no usable equality is
221/// present. The returned residual is the rest of the conjunction with
222/// the equality removed — `None` when the filter was a bare equality.
223fn extract_equality_with_residual(
224 expr: &FilterExpr,
225) -> Option<(String, SqlValue, Option<SqlExpr>)> {
226 match expr {
227 FilterExpr::Comparison {
228 field,
229 op: CompareOp::Eq,
230 value,
231 } => Some((field.clone(), value.clone(), None)),
232 FilterExpr::Expr(sql_expr) => {
233 let (col, lit, residual) = split_equality_from_expr(sql_expr)?;
234 Some((col, lit, residual))
235 }
236 _ => None,
237 }
238}
239
240/// Return `(column, literal, residual_expr)` for an equality found
241/// anywhere in a right-leaning AND conjunction tree, or `None` if no
242/// bare column-equals-literal predicate exists. The residual preserves
243/// every sibling conjunct in their original order; `None` means the
244/// expression was a bare equality with nothing left behind.
245fn split_equality_from_expr(expr: &SqlExpr) -> Option<(String, SqlValue, Option<SqlExpr>)> {
246 // Gather the conjuncts of a top-level AND chain left-to-right.
247 let mut conjuncts: Vec<SqlExpr> = Vec::new();
248 flatten_and(expr, &mut conjuncts);
249
250 // Find the first conjunct that is a bare column-equals-literal.
251 let eq_idx = conjuncts.iter().position(is_column_eq_literal)?;
252 let eq = conjuncts.remove(eq_idx);
253 let (col, lit) = match eq {
254 SqlExpr::BinaryOp { left, op, right } => match (*left, op, *right) {
255 (SqlExpr::Column { name, .. }, BinaryOp::Eq, SqlExpr::Literal(v)) => (name, v),
256 (SqlExpr::Literal(v), BinaryOp::Eq, SqlExpr::Column { name, .. }) => (name, v),
257 _ => return None,
258 },
259 _ => return None,
260 };
261
262 let residual = rebuild_and(conjuncts);
263 Some((col, lit, residual))
264}
265
266/// Append every leaf of a right-leaning `AND` tree to `out`. Non-AND
267/// expressions are a single leaf.
268fn flatten_and(expr: &SqlExpr, out: &mut Vec<SqlExpr>) {
269 match expr {
270 SqlExpr::BinaryOp {
271 left,
272 op: BinaryOp::And,
273 right,
274 } => {
275 flatten_and(left, out);
276 flatten_and(right, out);
277 }
278 other => out.push(other.clone()),
279 }
280}
281
282/// Reassemble a list of conjuncts into a right-leaning AND tree.
283/// Empty input returns `None`; single input returns itself.
284fn rebuild_and(mut conjuncts: Vec<SqlExpr>) -> Option<SqlExpr> {
285 let last = conjuncts.pop()?;
286 Some(
287 conjuncts
288 .into_iter()
289 .rfold(last, |acc, next| SqlExpr::BinaryOp {
290 left: Box::new(next),
291 op: BinaryOp::And,
292 right: Box::new(acc),
293 }),
294 )
295}
296
297/// Decide whether the query's WHERE conjuncts entail the partial-index
298/// predicate. `None` predicate means a full index — trivially entailed.
299/// Initial version uses conjunct-level structural equality: every
300/// conjunct of the index predicate must appear (by `PartialEq`) as a
301/// conjunct of the query. This is conservative and deliberately so —
302/// a false positive here would silently omit rows from query results,
303/// which is the exact bug #73 reports.
304fn partial_index_entailed(predicate: Option<&str>, query_conjuncts: &[SqlExpr]) -> bool {
305 let Some(text) = predicate else {
306 return true;
307 };
308 let Ok(parsed) = crate::parse_expr_string(text) else {
309 // A catalog predicate we can't parse is not entailed — refuse
310 // to use the index rather than assume anything about its
311 // contents. The DDL path validates at CREATE INDEX time, so
312 // reaching this branch indicates drift.
313 return false;
314 };
315 let mut index_conjuncts: Vec<SqlExpr> = Vec::new();
316 flatten_and(&parsed, &mut index_conjuncts);
317 // Structural equality via the stable `Debug` representation:
318 // `SqlExpr` doesn't derive `PartialEq` (several nested variants
319 // carry types that can't derive it cheaply), but the Debug form is
320 // canonical for equivalent trees produced by the same parser. This
321 // is conservative — it matches only identical AST shapes and not,
322 // e.g., `a = 1` vs `1 = a`. Callers should write index predicates
323 // in the same normal form they use in query WHERE clauses, which
324 // is the convention everywhere else in the codebase.
325 index_conjuncts.iter().all(|ic| {
326 let ic_dbg = format!("{ic:?}");
327 query_conjuncts.iter().any(|qc| format!("{qc:?}") == ic_dbg)
328 })
329}
330
331fn is_column_eq_literal(expr: &SqlExpr) -> bool {
332 matches!(
333 expr,
334 SqlExpr::BinaryOp { left, op: BinaryOp::Eq, right }
335 if matches!(
336 (left.as_ref(), right.as_ref()),
337 (SqlExpr::Column { .. }, SqlExpr::Literal(_))
338 | (SqlExpr::Literal(_), SqlExpr::Column { .. }),
339 )
340 )
341}
342
343fn canonical_index_field(field: &str) -> String {
344 if field.starts_with("$.") || field.starts_with('$') {
345 field.to_string()
346 } else {
347 format!("$.{field}")
348 }
349}
350
351fn lowercase_string_value(v: &SqlValue) -> SqlValue {
352 if let SqlValue::String(s) = v {
353 SqlValue::String(s.to_lowercase())
354 } else {
355 v.clone()
356 }
357}
358
359/// Resolve the engine rules for a given engine type.
360///
361/// No catch-all — compiler enforces exhaustiveness.
362pub fn resolve_engine_rules(engine: EngineType) -> &'static dyn EngineRules {
363 match engine {
364 EngineType::DocumentSchemaless => &document_schemaless::SchemalessRules,
365 EngineType::DocumentStrict => &document_strict::StrictRules,
366 EngineType::KeyValue => &kv::KvRules,
367 EngineType::Columnar => &columnar::ColumnarRules,
368 EngineType::Timeseries => ×eries::TimeseriesRules,
369 EngineType::Spatial => &spatial::SpatialRules,
370 }
371}