Skip to main content

nodedb_sql/types/plan/
variants.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! The `SqlPlan` enum — top-level plan produced by the SQL planner.
4
5use crate::fts_types::FtsQuery;
6use crate::temporal::TemporalScope;
7use crate::types_array;
8use crate::types_expr::{SqlExpr, SqlPayloadAtom, SqlValue};
9pub use nodedb_types::vector_distance::DistanceMetric;
10
11use crate::types::filter::Filter;
12use crate::types::query::{
13    AggregateExpr, EngineType, JoinType, Projection, SortKey, SpatialPredicate, WindowSpec,
14};
15
16use super::merge_types::MergePlanClause;
17use super::row_types::{KvInsertIntent, VectorPrimaryRow};
18use super::vector_opts::{ArrayPrefilter, VectorAnnOptions};
19
20/// The top-level plan produced by the SQL planner.
21#[derive(Debug, Clone)]
22pub enum SqlPlan {
23    // ── Constant ──
24    /// Query with no FROM clause: SELECT 1, SELECT 'hello' AS name, etc.
25    /// Produces a single row with evaluated constant expressions.
26    ConstantResult {
27        columns: Vec<String>,
28        values: Vec<SqlValue>,
29    },
30
31    // ── Reads ──
32    Scan {
33        collection: String,
34        alias: Option<String>,
35        engine: EngineType,
36        filters: Vec<Filter>,
37        projection: Vec<Projection>,
38        sort_keys: Vec<SortKey>,
39        limit: Option<usize>,
40        offset: usize,
41        distinct: bool,
42        window_functions: Vec<WindowSpec>,
43        /// Bitemporal qualifier extracted from `FOR SYSTEM_TIME` /
44        /// `FOR VALID_TIME`. Default when the scan is current-state.
45        temporal: TemporalScope,
46    },
47    PointGet {
48        collection: String,
49        alias: Option<String>,
50        engine: EngineType,
51        key_column: String,
52        key_value: SqlValue,
53    },
54    /// Document fetch via a secondary index: equality predicate on an
55    /// indexed field. The executor performs an index lookup to resolve
56    /// matching document IDs, reads each document, and applies any
57    /// remaining filters, projection, sort, and limit.
58    ///
59    /// Emitted by `document_schemaless::plan_scan` /
60    /// `document_strict::plan_scan` when the WHERE clause contains a
61    /// single equality predicate on a `Ready` indexed field. Any
62    /// additional predicates fall through as post-filters.
63    DocumentIndexLookup {
64        collection: String,
65        alias: Option<String>,
66        engine: EngineType,
67        /// Indexed field path used for the lookup.
68        field: String,
69        /// Equality value from the WHERE clause.
70        value: SqlValue,
71        /// Remaining filters after extracting the equality used for lookup.
72        filters: Vec<Filter>,
73        projection: Vec<Projection>,
74        sort_keys: Vec<SortKey>,
75        limit: Option<usize>,
76        offset: usize,
77        distinct: bool,
78        window_functions: Vec<WindowSpec>,
79        /// Whether the chosen index is COLLATE NOCASE — the executor
80        /// lowercases the lookup value before probing.
81        case_insensitive: bool,
82        /// Bitemporal qualifier — mirrors `Scan::temporal`. Document
83        /// engines must honor it at the Ceiling stage.
84        temporal: TemporalScope,
85    },
86    RangeScan {
87        collection: String,
88        field: String,
89        lower: Option<SqlValue>,
90        upper: Option<SqlValue>,
91        limit: usize,
92    },
93
94    // ── Writes ──
95    Insert {
96        collection: String,
97        engine: EngineType,
98        rows: Vec<Vec<(String, SqlValue)>>,
99        /// Column defaults from schema: `(column_name, default_expr)`.
100        /// Used to auto-generate values for missing columns (e.g. `id` with `UUID_V7`).
101        column_defaults: Vec<(String, String)>,
102        /// `ON CONFLICT DO NOTHING` semantics: when true, duplicate-PK rows
103        /// are silently skipped instead of raising `unique_violation`. Plain
104        /// `INSERT` (no `ON CONFLICT` clause) sets this to `false`.
105        if_absent: bool,
106        /// Raw column type strings from the catalog: `(column_name, type_str)`.
107        /// Forwarded from `InsertParams::column_schema`. Used by columnar
108        /// converters to reconstruct the exact `ColumnType` for columns whose
109        /// `SqlDataType` is ambiguous (e.g. JSON and Bytes both map to Bytes).
110        column_schema: Vec<(String, String)>,
111    },
112    /// KV INSERT: key and value are fundamentally separate.
113    /// Each entry is `(key, value_columns)`.
114    KvInsert {
115        collection: String,
116        entries: Vec<(SqlValue, Vec<(String, SqlValue)>)>,
117        /// TTL in seconds (0 = no expiry). Extracted from `ttl` column if present.
118        ttl_secs: u64,
119        /// INSERT-vs-UPSERT distinction. `KvOp::Put` is a Redis-SET-style
120        /// upsert by design; to honor SQL `INSERT` semantics the planner must
121        /// tell the converter whether a duplicate key should raise (plain
122        /// `INSERT`, `Insert`), be silently skipped (`ON CONFLICT DO NOTHING`,
123        /// `InsertIfAbsent`), or overwrite (`UPSERT` / `ON CONFLICT DO
124        /// UPDATE`, `Put`).
125        intent: KvInsertIntent,
126        /// `ON CONFLICT (key) DO UPDATE SET field = expr` assignments, carried
127        /// through when `intent == Put` via the ON-CONFLICT-DO-UPDATE path.
128        /// Empty for plain UPSERT (whole-value overwrite) and for INSERT
129        /// variants.
130        on_conflict_updates: Vec<(String, SqlExpr)>,
131    },
132    /// UPSERT: insert or merge if document exists.
133    Upsert {
134        collection: String,
135        engine: EngineType,
136        rows: Vec<Vec<(String, SqlValue)>>,
137        column_defaults: Vec<(String, String)>,
138        /// `ON CONFLICT (...) DO UPDATE SET field = expr` assignments.
139        /// When empty, upsert is a plain merge: new columns overwrite existing.
140        /// When non-empty, the engine applies these per-row against the
141        /// *existing* document instead of merging the inserted values.
142        on_conflict_updates: Vec<(String, SqlExpr)>,
143        /// Raw column type strings from the catalog: `(column_name, type_str)`.
144        /// Mirrors `Insert::column_schema` — see that field for rationale.
145        column_schema: Vec<(String, String)>,
146    },
147    InsertSelect {
148        target: String,
149        source: Box<SqlPlan>,
150        limit: usize,
151    },
152    Update {
153        collection: String,
154        engine: EngineType,
155        assignments: Vec<(String, SqlExpr)>,
156        filters: Vec<Filter>,
157        target_keys: Vec<SqlValue>,
158        returning: bool,
159    },
160    /// `UPDATE target SET col = src.col2 FROM src WHERE target.id = src.id`
161    ///
162    /// Two-phase execution: scan `source` with `source_filters`, then for
163    /// each matched source row that satisfies the join predicates against a
164    /// target row, apply `assignments` (which may reference source columns
165    /// via qualified names `src.col`).
166    ///
167    /// `join_predicates` are equality pairs `(target_col, source_col)` extracted
168    /// from the WHERE clause linking the two tables. `target_filters` are
169    /// remaining WHERE predicates that reference only `target`.
170    UpdateFrom {
171        collection: String,
172        engine: EngineType,
173        /// The FROM source: a `Scan`, `Join`, or other read plan.
174        source: Box<SqlPlan>,
175        /// Column name used as the target's join key (e.g. `"id"`).
176        target_join_col: String,
177        /// Column name used as the source's join key (e.g. `"id"`).
178        source_join_col: String,
179        /// SET assignments — RHS may be `SqlExpr::Column { table: Some("src"), .. }`.
180        assignments: Vec<(String, SqlExpr)>,
181        /// Filters that apply only to the target collection.
182        target_filters: Vec<Filter>,
183        returning: bool,
184    },
185    Delete {
186        collection: String,
187        engine: EngineType,
188        filters: Vec<Filter>,
189        target_keys: Vec<SqlValue>,
190    },
191    Truncate {
192        collection: String,
193        restart_identity: bool,
194    },
195
196    // ── Joins ──
197    Join {
198        left: Box<SqlPlan>,
199        right: Box<SqlPlan>,
200        on: Vec<(String, String)>,
201        join_type: JoinType,
202        condition: Option<SqlExpr>,
203        limit: usize,
204        /// Post-join projection: column names to keep (empty = all columns).
205        projection: Vec<Projection>,
206        /// Post-join filters (from WHERE clause).
207        filters: Vec<Filter>,
208    },
209
210    // ── Aggregation ──
211    Aggregate {
212        input: Box<SqlPlan>,
213        group_by: Vec<SqlExpr>,
214        aggregates: Vec<AggregateExpr>,
215        having: Vec<Filter>,
216        limit: usize,
217        /// When the GROUP BY contains ROLLUP/CUBE/GROUPING SETS, this field holds
218        /// the expansion. Each inner `Vec<usize>` is one grouping set — the indices
219        /// into `group_by` (the canonical key list) that are *present* (non-NULL)
220        /// for rows in that set.  `None` = plain single-set GROUP BY.
221        grouping_sets: Option<Vec<Vec<usize>>>,
222        /// ORDER BY applied to the aggregated rows. Empty = no sort
223        /// (executor returns groups in hash-map iteration order).
224        /// Populated by `apply_order_by` when an outer ORDER BY
225        /// targets a GROUP BY result; the Aggregate executor sorts the
226        /// finalized group rows before returning.
227        sort_keys: Vec<SortKey>,
228    },
229
230    // ── Timeseries ──
231    TimeseriesScan {
232        collection: String,
233        time_range: (i64, i64),
234        bucket_interval_ms: i64,
235        group_by: Vec<String>,
236        aggregates: Vec<AggregateExpr>,
237        filters: Vec<Filter>,
238        projection: Vec<Projection>,
239        gap_fill: String,
240        limit: usize,
241        tiered: bool,
242        /// Bitemporal system-time / valid-time scope. Only non-default
243        /// on collections created `WITH BITEMPORAL`; `TimeseriesRules::plan_scan`
244        /// rejects temporal scopes otherwise.
245        temporal: TemporalScope,
246    },
247    TimeseriesIngest {
248        collection: String,
249        rows: Vec<Vec<(String, SqlValue)>>,
250    },
251
252    // ── Search (first-class) ──
253    VectorSearch {
254        collection: String,
255        field: String,
256        query_vector: Vec<f32>,
257        top_k: usize,
258        ef_search: usize,
259        /// Distance metric requested by the query operator (`<->`, `<=>`, `<#>`).
260        /// Overrides the collection-default metric at search time.
261        metric: DistanceMetric,
262        filters: Vec<Filter>,
263        /// Optional cross-engine prefilter: when set, the ND-array slice
264        /// runs first and its output cells' surrogates form a bitmap that
265        /// gates the HNSW candidate set. Set by the planner when an
266        /// `ORDER BY vector_distance(...) LIMIT k` query is JOINed against
267        /// `ARRAY_SLICE(...)`. The convert layer lowers this to
268        /// `VectorOp::Search { inline_prefilter_plan: Some(ArrayOp::SurrogateBitmapScan) }`.
269        array_prefilter: Option<ArrayPrefilter>,
270        /// ANN knobs parsed from the optional third JSON-string argument
271        /// to `vector_distance(field, query, '{...}')`.
272        ann_options: VectorAnnOptions,
273        /// When `true`, the projection contains only the surrogate/PK column
274        /// and/or `vector_distance(...)` — no payload fields. The Data Plane
275        /// can skip the document-body fetch entirely for vector-primary
276        /// collections. Always `false` for non-vector-primary collections
277        /// (document body is the primary result).
278        skip_payload_fetch: bool,
279        /// Predicates against payload-indexed columns on a vector-primary
280        /// collection. Each atom is `Eq(field, value)`, `In(field, values)`,
281        /// or `Range(field, ...)`. The convert layer translates SqlValue →
282        /// nodedb_types::Value and emits them as
283        /// `VectorOp::Search::payload_filters`. The Data Plane intersects
284        /// the resulting bitmap with the HNSW candidate set via the
285        /// per-collection `PayloadIndexSet::pre_filter`.
286        payload_filters: Vec<SqlPayloadAtom>,
287    },
288    MultiVectorSearch {
289        collection: String,
290        query_vector: Vec<f32>,
291        top_k: usize,
292        ef_search: usize,
293    },
294    TextSearch {
295        collection: String,
296        /// Structured FTS query.  Use `FtsQuery::Plain { text, fuzzy }` for
297        /// simple keyword search.  `FtsQuery::And/Or/Prefix` are supported;
298        /// `FtsQuery::Phrase` and `FtsQuery::Not` are represented but rejected
299        /// by the executor with `Unsupported`.
300        query: FtsQuery,
301        top_k: usize,
302        filters: Vec<Filter>,
303        /// When set, the SELECT list contains `bm25_score(field, term)` and the
304        /// caller wants a full-collection scan with the score injected under this
305        /// alias. The converter emits `TextOp::BM25ScoreScan` instead of
306        /// `TextOp::Search` so that all documents — including non-matching ones —
307        /// appear in the response with `null` for the score when they do not
308        /// contain the term.
309        score_alias: Option<String>,
310    },
311    HybridSearch {
312        collection: String,
313        query_vector: Vec<f32>,
314        query_text: String,
315        top_k: usize,
316        ef_search: usize,
317        vector_weight: f32,
318        fuzzy: bool,
319        /// SELECT-list alias the response should use for the RRF score
320        /// column. `None` means the executor falls back to the fixed
321        /// internal field name `rrf_score`. Set by the planner from the
322        /// SELECT projection's `AS <alias>` for the `rrf_score(...)` call.
323        score_alias: Option<String>,
324    },
325
326    /// Three-source hybrid search: vector + BM25 text + graph BFS, fused via weighted RRF.
327    ///
328    /// Produced when the planner detects `rrf_score(vector_distance(...),
329    /// bm25_score(...), graph_score(...))` with three source arguments.
330    HybridSearchTriple {
331        collection: String,
332        query_vector: Vec<f32>,
333        query_text: String,
334        /// Node id used as the BFS seed for the graph leg.
335        graph_seed_id: String,
336        /// Maximum BFS depth from the seed node.
337        graph_depth: usize,
338        /// Edge label filter for graph BFS. `None` = all edges.
339        graph_edge_label: Option<String>,
340        top_k: usize,
341        ef_search: usize,
342        fuzzy: bool,
343        /// Per-source RRF k constants: (vector_k, text_k, graph_k).
344        rrf_k: (f64, f64, f64),
345        /// SELECT-list alias for the fused RRF score column.
346        score_alias: Option<String>,
347    },
348    SpatialScan {
349        collection: String,
350        field: String,
351        predicate: SpatialPredicate,
352        query_geometry: nodedb_types::geometry::Geometry,
353        distance_meters: f64,
354        attribute_filters: Vec<Filter>,
355        limit: usize,
356        projection: Vec<Projection>,
357    },
358
359    // ── Composite ──
360    Union {
361        inputs: Vec<SqlPlan>,
362        distinct: bool,
363    },
364    Intersect {
365        left: Box<SqlPlan>,
366        right: Box<SqlPlan>,
367        all: bool,
368    },
369    Except {
370        left: Box<SqlPlan>,
371        right: Box<SqlPlan>,
372        all: bool,
373    },
374    RecursiveScan {
375        collection: String,
376        base_filters: Vec<Filter>,
377        recursive_filters: Vec<Filter>,
378        /// Equi-join link for tree-traversal recursion:
379        /// `(collection_field, working_table_field)`.
380        /// e.g. `("parent_id", "id")` means each iteration finds rows
381        /// where `collection.parent_id` matches a `working_table.id`.
382        join_link: Option<(String, String)>,
383        max_iterations: usize,
384        distinct: bool,
385        limit: usize,
386    },
387
388    /// Value-generating recursive CTE (`WITH RECURSIVE name(cols) AS (anchor UNION [ALL] step)`).
389    ///
390    /// Unlike `RecursiveScan`, this variant carries no collection reference — the anchor
391    /// row is produced entirely from literal expressions and each iteration applies the
392    /// step expressions to the previous row.  The executor evaluates this iteratively
393    /// in the Data Plane without touching storage.
394    ///
395    /// All expressions are stored as raw SQL text so they can be serialised across the
396    /// SPSC bridge without requiring `SqlExpr` to implement `Serialize`.  The executor
397    /// parses them at execution time via the same lightweight expression evaluator used
398    /// by the procedural executor.
399    RecursiveValue {
400        /// CTE name (used in error messages).
401        cte_name: String,
402        /// Column names declared on the CTE (e.g. `(n)` in `c(n) AS ...`).
403        columns: Vec<String>,
404        /// Anchor SELECT expressions as raw SQL text (one per column).
405        init_exprs: Vec<String>,
406        /// Recursive step SELECT expressions as raw SQL text (one per column).
407        /// May reference column names from `columns`.
408        step_exprs: Vec<String>,
409        /// Optional WHERE condition as raw SQL text applied to each new row
410        /// to decide whether to continue.  `None` → run until fixed point.
411        condition: Option<String>,
412        /// Maximum iterations before a `RecursionDepthExceeded` error is raised.
413        max_depth: usize,
414        /// `false` → UNION ALL (keep duplicates); `true` → UNION (deduplicate).
415        distinct: bool,
416    },
417
418    /// Non-recursive CTE: execute each definition, then the outer query.
419    Cte {
420        /// CTE definitions: `(name, subquery_plan)`.
421        definitions: Vec<(String, SqlPlan)>,
422        /// The outer query that references CTE names.
423        outer: Box<SqlPlan>,
424    },
425
426    // ── Array (ND sparse) ─────────────────────────────────────
427    /// `CREATE ARRAY <name> DIMS (...) ATTRS (...) TILE_EXTENTS (...)`.
428    /// AST is engine-agnostic — the Origin converter builds the typed
429    /// `nodedb_array::ArraySchema` and persists the catalog row.
430    CreateArray {
431        name: String,
432        dims: Vec<types_array::ArrayDimAst>,
433        attrs: Vec<types_array::ArrayAttrAst>,
434        tile_extents: Vec<i64>,
435        cell_order: types_array::ArrayCellOrderAst,
436        tile_order: types_array::ArrayTileOrderAst,
437        /// Hilbert-prefix bits for vShard routing (1–16, default 8).
438        prefix_bits: u8,
439        /// Audit-retention horizon in milliseconds. `None` = non-bitemporal.
440        audit_retain_ms: Option<u64>,
441        /// Compliance floor for `audit_retain_ms`. `None` = no floor.
442        minimum_audit_retain_ms: Option<u64>,
443    },
444    /// `DROP ARRAY [IF EXISTS] <name>` — pure Control-Plane catalog
445    /// mutation. Per-core array store cleanup happens lazily.
446    DropArray { name: String, if_exists: bool },
447    /// `ALTER ARRAY <name> SET (audit_retain_ms = N, ...)`.
448    ///
449    /// Double-`Option` semantics for each diff field:
450    /// - `None`          = key was absent from SET clause → field unchanged.
451    /// - `Some(None)`    = key present with value `NULL` → field set to NULL.
452    /// - `Some(Some(v))` = key present with integer value → field set to v.
453    AlterArray {
454        name: String,
455        /// New value for `audit_retain_ms`. `Some(None)` unregisters
456        /// the array from the bitemporal retention registry.
457        audit_retain_ms: Option<Option<i64>>,
458        /// New value for `minimum_audit_retain_ms`. Cannot be NULL.
459        minimum_audit_retain_ms: Option<u64>,
460    },
461    /// `INSERT INTO ARRAY <name> COORDS (...) VALUES (...) [, ...]`.
462    InsertArray {
463        name: String,
464        rows: Vec<types_array::ArrayInsertRow>,
465    },
466    /// `DELETE FROM ARRAY <name> WHERE COORDS IN ((...), (...))`.
467    DeleteArray {
468        name: String,
469        coords: Vec<Vec<types_array::ArrayCoordLiteral>>,
470    },
471    /// `SELECT * FROM ARRAY_SLICE(name, {dim:[lo,hi],..}, [attrs], limit)`.
472    ArraySlice {
473        name: String,
474        slice: types_array::ArraySliceAst,
475        /// Attribute names. Empty = all attrs.
476        attr_projection: Vec<String>,
477        /// 0 = unlimited.
478        limit: u32,
479        /// Bitemporal qualifier. When both axes are `None` / `Any`, the Data
480        /// Plane returns the live (current) state — the default fast path.
481        /// Populated from `AS OF SYSTEM TIME` / `AS OF VALID TIME` clauses.
482        temporal: TemporalScope,
483    },
484    /// `SELECT * FROM ARRAY_PROJECT(name, [attrs])`.
485    ArrayProject {
486        name: String,
487        /// Attribute names. Must be non-empty.
488        attr_projection: Vec<String>,
489    },
490    /// `SELECT * FROM ARRAY_AGG(name, attr, reducer [, group_by_dim])`.
491    ArrayAgg {
492        name: String,
493        attr: String,
494        reducer: types_array::ArrayReducerAst,
495        /// `None` = scalar fold; `Some(name)` = group by that dim.
496        group_by_dim: Option<String>,
497        /// Bitemporal qualifier. When both axes are `None` / `Any`, the Data
498        /// Plane aggregates against the live (current) state — the default
499        /// fast path. Populated from `AS OF SYSTEM TIME` / `AS OF VALID TIME`.
500        temporal: TemporalScope,
501    },
502    /// `SELECT * FROM ARRAY_ELEMENTWISE(left, right, op, attr)`.
503    ArrayElementwise {
504        left: String,
505        right: String,
506        op: types_array::ArrayBinaryOpAst,
507        attr: String,
508    },
509    /// `SELECT ARRAY_FLUSH(name)` — returns one row `{result: BOOL}`.
510    ArrayFlush { name: String },
511    /// `SELECT ARRAY_COMPACT(name)` — returns one row `{result: BOOL}`.
512    ArrayCompact { name: String },
513
514    // ── MERGE ──────────────────────────────────────────────────────────
515    /// `MERGE INTO target USING source ON ... WHEN ... THEN ...`
516    ///
517    /// Supported only for `document_schemaless` and `document_strict` engines.
518    /// The Data Plane handler evaluates WHEN arms in declaration order and
519    /// applies the first matching action to each joined or unmatched row.
520    Merge {
521        target: String,
522        engine: EngineType,
523        /// Source plan (Scan, DocumentIndexLookup, or Join of a sub-select).
524        source: Box<SqlPlan>,
525        /// Column in the target used for the equi-join (from ON clause).
526        target_join_col: String,
527        /// Column in the source used for the equi-join (from ON clause).
528        source_join_col: String,
529        /// Alias used to qualify source columns in expressions (e.g. `src.col`).
530        source_alias: String,
531        /// WHEN arms in declaration order.
532        clauses: Vec<MergePlanClause>,
533        returning: bool,
534    },
535
536    // ── Lateral joins ───────────────────────────────────────────────────
537    /// LATERAL subquery that is equi-correlated and has ORDER BY + LIMIT k.
538    ///
539    /// Emitted when the inner subquery has an equi-key correlation to the outer
540    /// table plus an `ORDER BY ... LIMIT k` clause. The Data Plane scans the
541    /// inner collection once per outer row applying the equi-filter, sorts by
542    /// `inner_order_by`, and retains at most `inner_limit` rows.
543    ///
544    /// `correlation_keys` is `(outer_col, inner_col)` — the equi-join pairs
545    /// that correlate inner to outer.
546    LateralTopK {
547        /// Plan producing outer rows.
548        outer: Box<SqlPlan>,
549        /// Alias used to qualify outer table columns (e.g. `"u"`).
550        outer_alias: Option<String>,
551        /// Inner collection to scan.
552        inner_collection: String,
553        /// Pre-filter applied to inner rows (non-correlated filters).
554        inner_filters: Vec<Filter>,
555        /// Sort keys for the inner per-outer-row result.
556        inner_order_by: Vec<SortKey>,
557        /// Maximum number of inner rows per outer row.
558        inner_limit: usize,
559        /// Equi-join pairs `(outer_col, inner_col)`.
560        correlation_keys: Vec<(String, String)>,
561        /// Alias under which inner rows are presented.
562        lateral_alias: String,
563        /// Post-lateral projection.
564        projection: Vec<Projection>,
565        /// LEFT join semantics: preserve outer rows even when inner is empty.
566        left_join: bool,
567    },
568
569    /// General LATERAL subquery — per-outer-row correlated nested loop.
570    ///
571    /// Emitted for LATERAL subqueries that cannot be rewritten as equi-join
572    /// hash joins or `LateralTopK`. The Control Plane drives execution: it
573    /// materialises outer rows, then for each row substitutes the correlation
574    /// values as additional filters on the inner plan and re-dispatches it.
575    ///
576    /// Bounded by `outer_row_cap`; queries that exceed the cap receive a typed
577    /// `SqlError::Unsupported` before any data is returned.
578    LateralLoop {
579        /// Plan producing outer rows.
580        outer: Box<SqlPlan>,
581        /// Alias used to qualify outer table columns.
582        outer_alias: Option<String>,
583        /// Inner subquery plan (correlation predicates are injected at runtime).
584        inner: Box<SqlPlan>,
585        /// Correlated predicates extracted from the inner WHERE that reference
586        /// outer columns.  Each entry is `(inner_field, outer_field)`.
587        correlation_predicates: Vec<(String, String)>,
588        /// Alias under which inner rows are presented.
589        lateral_alias: String,
590        /// Post-lateral projection.
591        projection: Vec<Projection>,
592        /// Maximum outer rows allowed. Queries exceeding this return an error.
593        outer_row_cap: usize,
594        /// LEFT join semantics: preserve outer rows even when inner is empty.
595        left_join: bool,
596    },
597
598    // ── Vector-primary ──────────────────────────────────────────────────
599    /// INSERT into a vector-primary collection.
600    ///
601    /// Emitted by the planner instead of the generic `Insert` variant when the
602    /// target collection has `primary = PrimaryEngine::Vector`. The Data Plane
603    /// routes each row through `VectorOp::DirectUpsert`, bypassing full-document
604    /// MessagePack encoding.
605    VectorPrimaryInsert {
606        collection: String,
607        /// Vector column name (matches `VectorPrimaryConfig::vector_field`).
608        /// Plumbed to `VectorOp::DirectUpsert` so the Data Plane keys its
609        /// HNSW index by `(tid, collection, field)` — the same key the SELECT
610        /// path uses.
611        field: String,
612        /// Collection-level quantization. Applied via `set_quantization` on
613        /// the first DirectUpsert so subsequent seals trigger codec-dispatch
614        /// rebuilds against the configured codec.
615        quantization: nodedb_types::VectorQuantization,
616        /// Native storage dtype for vector values (F32 / F16 / BF16).
617        storage_dtype: nodedb_types::VectorStorageDtype,
618        /// Payload field names that get equality bitmap indexes. Registered
619        /// via `payload.add_index` on the first DirectUpsert.
620        payload_indexes: Vec<(String, nodedb_types::PayloadIndexKind)>,
621        rows: Vec<VectorPrimaryRow>,
622    },
623
624    // ── Index DDL ───────────────────────────────────────────────────────
625    /// `CREATE [UNIQUE] INDEX [IF NOT EXISTS] name ON collection (field)`
626    ///
627    /// Registers a secondary index on the named field of a document or KV
628    /// collection. The executor backtracks existing rows into the index so
629    /// it is immediately consistent.
630    CreateIndex {
631        /// Name of the index. `None` requests an auto-generated name.
632        index_name: Option<String>,
633        /// Target collection.
634        collection: String,
635        /// Indexed field path.
636        field: String,
637        /// Whether the index enforces uniqueness.
638        unique: bool,
639        /// `IF NOT EXISTS` — succeed silently if the index already exists.
640        if_not_exists: bool,
641        /// Case-insensitive string collation (`COLLATE NOCASE`).
642        case_insensitive: bool,
643    },
644
645    /// `DROP INDEX [IF EXISTS] name [ON collection]`
646    ///
647    /// Removes a secondary index from a collection. All index entries are
648    /// erased and the index metadata is unregistered.
649    DropIndex {
650        /// Name of the index.
651        index_name: String,
652        /// Target collection (may be inferred from the index catalog).
653        collection: Option<String>,
654        /// `IF EXISTS` — succeed silently if the index does not exist.
655        if_exists: bool,
656    },
657}