nodedb-sql 0.2.0

SQL parser, planner, and optimizer for NodeDB
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
// SPDX-License-Identifier: Apache-2.0

pub mod array;
pub mod columnar;
pub mod document_schemaless;
pub mod document_strict;
pub mod kv;
pub mod spatial;
pub mod timeseries;

use crate::error::Result;
use crate::types::*;

/// Parameters for planning an INSERT operation.
pub struct InsertParams {
    pub collection: String,
    pub columns: Vec<String>,
    pub rows: Vec<Vec<(String, SqlValue)>>,
    pub column_defaults: Vec<(String, String)>,
    /// `ON CONFLICT DO NOTHING` semantics: duplicate-PK rows are skipped
    /// silently. `false` for plain `INSERT` (raises `unique_violation`).
    pub if_absent: bool,
}

/// Parameters for planning a SCAN operation.
pub struct ScanParams {
    pub collection: String,
    pub alias: Option<String>,
    pub filters: Vec<Filter>,
    pub projection: Vec<Projection>,
    pub sort_keys: Vec<SortKey>,
    pub limit: Option<usize>,
    pub offset: usize,
    pub distinct: bool,
    pub window_functions: Vec<WindowSpec>,
    /// Secondary indexes available on the scan's collection. Document
    /// engines consult this to rewrite equality-on-indexed-field into
    /// [`SqlPlan::DocumentIndexLookup`]. Other engines ignore it today.
    pub indexes: Vec<IndexSpec>,
    /// Bitemporal qualifier propagated from `plan_sql`. Engines without
    /// bitemporal storage support reject a non-default scope via
    /// `SqlError::Unsupported` — silently ignoring it would return
    /// current-state data when the user asked for history.
    pub temporal: crate::temporal::TemporalScope,
    /// Whether this collection was created with bitemporal storage. When
    /// `true`, engines that support bitemporal reads route the scan
    /// through versioned storage; when `false`, a non-default
    /// [`Self::temporal`] is rejected.
    pub bitemporal: bool,
}

/// Parameters for planning a POINT GET operation.
pub struct PointGetParams {
    pub collection: String,
    pub alias: Option<String>,
    pub key_column: String,
    pub key_value: SqlValue,
}

/// Parameters for planning an UPDATE operation.
pub struct UpdateParams {
    pub collection: String,
    pub assignments: Vec<(String, SqlExpr)>,
    pub filters: Vec<Filter>,
    pub target_keys: Vec<SqlValue>,
    pub returning: bool,
}

/// Parameters for planning an `UPDATE target SET ... FROM src WHERE ...` operation.
pub struct UpdateFromParams {
    pub collection: String,
    /// The FROM source plan (Scan, Join, …).
    pub source: Box<SqlPlan>,
    /// Column in target used as the equi-join key.
    pub target_join_col: String,
    /// Column in source used as the equi-join key.
    pub source_join_col: String,
    /// SET assignments — RHS may reference source columns.
    pub assignments: Vec<(String, SqlExpr)>,
    /// Filters that apply only to the target.
    pub target_filters: Vec<Filter>,
    pub returning: bool,
}

/// Parameters for planning a DELETE operation.
pub struct DeleteParams {
    pub collection: String,
    pub filters: Vec<Filter>,
    pub target_keys: Vec<SqlValue>,
}

/// Parameters for planning a MERGE operation.
pub struct MergeParams {
    pub collection: String,
    pub source: Box<SqlPlan>,
    pub target_join_col: String,
    pub source_join_col: String,
    pub source_alias: String,
    pub clauses: Vec<crate::types::MergePlanClause>,
    pub returning: bool,
}

/// Parameters for planning an UPSERT operation.
pub struct UpsertParams {
    pub collection: String,
    pub columns: Vec<String>,
    pub rows: Vec<Vec<(String, SqlValue)>>,
    pub column_defaults: Vec<(String, String)>,
    /// `ON CONFLICT (...) DO UPDATE SET` assignments. Empty for plain
    /// `UPSERT INTO ...`; populated when the caller is
    /// `INSERT ... ON CONFLICT ... DO UPDATE SET`.
    pub on_conflict_updates: Vec<(String, SqlExpr)>,
}

/// Parameters for planning an AGGREGATE operation.
pub struct AggregateParams {
    pub collection: String,
    pub alias: Option<String>,
    pub filters: Vec<Filter>,
    pub group_by: Vec<SqlExpr>,
    pub aggregates: Vec<AggregateExpr>,
    pub having: Vec<Filter>,
    pub limit: usize,
    /// Timeseries-specific: bucket interval from time_bucket() call.
    pub bucket_interval_ms: Option<i64>,
    /// Timeseries-specific: non-time GROUP BY columns.
    pub group_columns: Vec<String>,
    /// Whether the collection has auto-tiering enabled.
    pub has_auto_tier: bool,
    /// Whether this collection was created with bitemporal storage.
    /// When `true`, the base scan inside the aggregate is allowed to
    /// carry a non-default temporal scope.
    pub bitemporal: bool,
    /// System-time / valid-time scope to propagate into the underlying
    /// scan so bitemporal aggregate queries project an as-of snapshot
    /// before grouping.
    pub temporal: crate::temporal::TemporalScope,
}

/// Engine-specific planning rules.
///
/// Each engine type implements this trait to produce the correct `SqlPlan`
/// variant for each operation, or return an error if the operation is not
/// supported. This is the single source of truth for operation routing —
/// no downstream code should ever check engine type to decide routing.
pub trait EngineRules {
    /// Plan an INSERT. Returns `Err` if the engine does not support inserts
    /// (e.g. timeseries routes to `TimeseriesIngest` instead).
    fn plan_insert(&self, params: InsertParams) -> Result<Vec<SqlPlan>>;
    /// Plan an UPSERT (insert-or-merge). Returns `Err` for append-only or
    /// columnar engines that don't support merge semantics.
    fn plan_upsert(&self, params: UpsertParams) -> Result<Vec<SqlPlan>>;
    /// Plan a table scan (SELECT without point-get optimization).
    fn plan_scan(&self, params: ScanParams) -> Result<SqlPlan>;
    /// Plan a point lookup by primary key. Returns `Err` for engines
    /// that don't support O(1) key lookups (e.g. timeseries).
    fn plan_point_get(&self, params: PointGetParams) -> Result<SqlPlan>;
    /// Plan an UPDATE. Returns `Err` for append-only engines.
    fn plan_update(&self, params: UpdateParams) -> Result<Vec<SqlPlan>>;
    /// Plan an `UPDATE target SET ... FROM src WHERE ...`.
    ///
    /// Returns `Err(SqlError::Unsupported)` for engines that cannot participate
    /// as an update target in a cross-table update (timeseries, kv-with-opaque-keys).
    fn plan_update_from(&self, params: UpdateFromParams) -> Result<Vec<SqlPlan>>;
    /// Plan a DELETE (point or bulk).
    fn plan_delete(&self, params: DeleteParams) -> Result<Vec<SqlPlan>>;
    /// Plan a GROUP BY / aggregate query.
    fn plan_aggregate(&self, params: AggregateParams) -> Result<SqlPlan>;
    /// Plan a MERGE statement.
    ///
    /// Returns `Err(SqlError::Unsupported)` for engines that do not support
    /// MERGE semantics (everything except `document_schemaless` and
    /// `document_strict`).
    fn plan_merge(&self, params: MergeParams) -> Result<Vec<SqlPlan>>;
}

/// Attempt to rewrite `ScanParams` into a [`SqlPlan::DocumentIndexLookup`]
/// when exactly one of the filters is an equality predicate on a `Ready`
/// indexed field. Returns `None` to fall through to a generic `Scan`.
///
/// Shared by the schemaless and strict document engines so the
/// index-rewrite rule has one source of truth. Normalizes strict column
/// names to `$.column` before matching against index fields because the
/// catalog stores every document index in JSON-path canonical form.
pub(crate) fn try_document_index_lookup(
    params: &ScanParams,
    engine: EngineType,
) -> Option<SqlPlan> {
    // Sort / window functions still force a full scan because the
    // IndexedFetch handler does not yet order rows or evaluate window
    // aggregates. DISTINCT is safe on the index path: the handler
    // emits each matched doc exactly once and any further dedup is a
    // cheap Control-Plane pass on the scan-shaped response.
    if !params.sort_keys.is_empty() || !params.window_functions.is_empty() {
        return None;
    }

    // Iterate filters to find the first equality candidate that lines up
    // with a Ready index. Keep the remaining filters as post-filters.
    // Predicates appear in three shapes:
    //   - `FilterExpr::Comparison { field, Eq, value }` — resolver path.
    //   - `FilterExpr::Expr(BinaryOp { Column, Eq, Literal })` — generic.
    //   - `FilterExpr::Expr(BinaryOp { left AND right })` — the
    //     top-level AND the `convert_where_to_filters` path produces
    //     for compound WHERE clauses like `a = 1 AND b > 2`. Descend
    //     into AND conjuncts so an equality nested in a conjunction
    //     still picks up the index, and the non-equality siblings
    //     survive as a residual conjunction carried on the
    //     IndexedFetch node (not as a wrapping Filter plan node —
    //     that wrapping is the Control-Plane post-filter anti-pattern
    //     the handler's `filters` payload exists to eliminate).
    // Pre-collect the query's top-level conjuncts once. Used for
    // partial-index entailment: every conjunct of the index predicate
    // must appear as a conjunct of the query WHERE for the rewrite to
    // be sound, otherwise the index would silently omit rows the
    // query expects to see.
    let query_conjuncts: Vec<SqlExpr> = params
        .filters
        .iter()
        .flat_map(|f| match &f.expr {
            FilterExpr::Expr(e) => {
                let mut v = Vec::new();
                flatten_and(e, &mut v);
                v
            }
            _ => Vec::new(),
        })
        .collect();

    for (i, f) in params.filters.iter().enumerate() {
        let Some((field, value, residual)) = extract_equality_with_residual(&f.expr) else {
            continue;
        };
        let canonical = canonical_index_field(&field);
        let Some(idx) = params.indexes.iter().find(|i| {
            i.state == IndexState::Ready
                && i.field == canonical
                && partial_index_entailed(i.predicate.as_deref(), &query_conjuncts)
        }) else {
            continue;
        };

        let mut remaining = params.filters.clone();
        match residual {
            Some(expr) => {
                remaining[i] = Filter {
                    expr: FilterExpr::Expr(expr),
                };
            }
            None => {
                remaining.remove(i);
            }
        }

        let lookup_value = if idx.case_insensitive {
            lowercase_string_value(&value)
        } else {
            value
        };

        return Some(SqlPlan::DocumentIndexLookup {
            collection: params.collection.clone(),
            alias: params.alias.clone(),
            engine,
            field: idx.field.clone(),
            value: lookup_value,
            filters: remaining,
            projection: params.projection.clone(),
            sort_keys: params.sort_keys.clone(),
            limit: params.limit,
            offset: params.offset,
            distinct: params.distinct,
            window_functions: params.window_functions.clone(),
            case_insensitive: idx.case_insensitive,
            temporal: params.temporal,
        });
    }
    None
}

/// Pull `(column_name, equality_value, residual_expr)` out of a filter
/// expression if it contains a column-equals-literal predicate that can
/// drive an index lookup. Returns `None` if no usable equality is
/// present. The returned residual is the rest of the conjunction with
/// the equality removed — `None` when the filter was a bare equality.
fn extract_equality_with_residual(
    expr: &FilterExpr,
) -> Option<(String, SqlValue, Option<SqlExpr>)> {
    match expr {
        FilterExpr::Comparison {
            field,
            op: CompareOp::Eq,
            value,
        } => Some((field.clone(), value.clone(), None)),
        FilterExpr::Expr(sql_expr) => {
            let (col, lit, residual) = split_equality_from_expr(sql_expr)?;
            Some((col, lit, residual))
        }
        _ => None,
    }
}

/// Return `(column, literal, residual_expr)` for an equality found
/// anywhere in a right-leaning AND conjunction tree, or `None` if no
/// bare column-equals-literal predicate exists. The residual preserves
/// every sibling conjunct in their original order; `None` means the
/// expression was a bare equality with nothing left behind.
fn split_equality_from_expr(expr: &SqlExpr) -> Option<(String, SqlValue, Option<SqlExpr>)> {
    // Gather the conjuncts of a top-level AND chain left-to-right.
    let mut conjuncts: Vec<SqlExpr> = Vec::new();
    flatten_and(expr, &mut conjuncts);

    // Find the first conjunct that is a bare column-equals-literal.
    let eq_idx = conjuncts.iter().position(is_column_eq_literal)?;
    let eq = conjuncts.remove(eq_idx);
    let (col, lit) = match eq {
        SqlExpr::BinaryOp { left, op, right } => match (*left, op, *right) {
            (SqlExpr::Column { name, .. }, BinaryOp::Eq, SqlExpr::Literal(v)) => (name, v),
            (SqlExpr::Literal(v), BinaryOp::Eq, SqlExpr::Column { name, .. }) => (name, v),
            _ => return None,
        },
        _ => return None,
    };

    let residual = rebuild_and(conjuncts);
    Some((col, lit, residual))
}

/// Append every leaf of a right-leaning `AND` tree to `out`. Non-AND
/// expressions are a single leaf.
fn flatten_and(expr: &SqlExpr, out: &mut Vec<SqlExpr>) {
    match expr {
        SqlExpr::BinaryOp {
            left,
            op: BinaryOp::And,
            right,
        } => {
            flatten_and(left, out);
            flatten_and(right, out);
        }
        other => out.push(other.clone()),
    }
}

/// Reassemble a list of conjuncts into a right-leaning AND tree.
/// Empty input returns `None`; single input returns itself.
fn rebuild_and(mut conjuncts: Vec<SqlExpr>) -> Option<SqlExpr> {
    let last = conjuncts.pop()?;
    Some(
        conjuncts
            .into_iter()
            .rfold(last, |acc, next| SqlExpr::BinaryOp {
                left: Box::new(next),
                op: BinaryOp::And,
                right: Box::new(acc),
            }),
    )
}

/// Decide whether the query's WHERE conjuncts entail the partial-index
/// predicate. `None` predicate means a full index — trivially entailed.
/// Initial version uses conjunct-level structural equality: every
/// conjunct of the index predicate must appear (by `PartialEq`) as a
/// conjunct of the query. This is conservative and deliberately so —
/// a false positive here would silently omit rows from query results.
fn partial_index_entailed(predicate: Option<&str>, query_conjuncts: &[SqlExpr]) -> bool {
    let Some(text) = predicate else {
        return true;
    };
    let Ok(parsed) = crate::parse_expr_string(text) else {
        // A catalog predicate we can't parse is not entailed — refuse
        // to use the index rather than assume anything about its
        // contents. The DDL path validates at CREATE INDEX time, so
        // reaching this branch indicates drift.
        return false;
    };
    let mut index_conjuncts: Vec<SqlExpr> = Vec::new();
    flatten_and(&parsed, &mut index_conjuncts);
    // Structural equality via the stable `Debug` representation:
    // `SqlExpr` doesn't derive `PartialEq` (several nested variants
    // carry types that can't derive it cheaply), but the Debug form is
    // canonical for equivalent trees produced by the same parser. This
    // is conservative — it matches only identical AST shapes and not,
    // e.g., `a = 1` vs `1 = a`. Callers should write index predicates
    // in the same normal form they use in query WHERE clauses, which
    // is the convention everywhere else in the codebase.
    index_conjuncts.iter().all(|ic| {
        let ic_dbg = format!("{ic:?}");
        query_conjuncts.iter().any(|qc| format!("{qc:?}") == ic_dbg)
    })
}

fn is_column_eq_literal(expr: &SqlExpr) -> bool {
    matches!(
        expr,
        SqlExpr::BinaryOp { left, op: BinaryOp::Eq, right }
            if matches!(
                (left.as_ref(), right.as_ref()),
                (SqlExpr::Column { .. }, SqlExpr::Literal(_))
                | (SqlExpr::Literal(_), SqlExpr::Column { .. }),
            )
    )
}

fn canonical_index_field(field: &str) -> String {
    if field.starts_with("$.") || field.starts_with('$') {
        field.to_string()
    } else {
        format!("$.{field}")
    }
}

fn lowercase_string_value(v: &SqlValue) -> SqlValue {
    if let SqlValue::String(s) = v {
        SqlValue::String(s.to_lowercase())
    } else {
        v.clone()
    }
}

/// Resolve the engine rules for a given engine type.
///
/// No catch-all — compiler enforces exhaustiveness.
pub fn resolve_engine_rules(engine: EngineType) -> &'static dyn EngineRules {
    match engine {
        EngineType::DocumentSchemaless => &document_schemaless::SchemalessRules,
        EngineType::DocumentStrict => &document_strict::StrictRules,
        EngineType::KeyValue => &kv::KvRules,
        EngineType::Columnar => &columnar::ColumnarRules,
        EngineType::Timeseries => &timeseries::TimeseriesRules,
        EngineType::Spatial => &spatial::SpatialRules,
        EngineType::Array => &array::ArrayRules,
    }
}