llkv_plan/
plans.rs

1//! Logical query plan structures for LLKV.
2//!
3//! This module defines the plan structures that represent logical query operations
4//! before they are executed. Plans are created by SQL parsers or fluent builders and
5//! consumed by execution engines.
6
7use std::sync::Arc;
8
9use arrow::array::{
10    ArrayRef, BooleanArray, Date32Array, Decimal128Array, Float64Array, Int64Array, StringArray,
11};
12use arrow::datatypes::{DataType, Schema};
13use arrow::record_batch::RecordBatch;
14use llkv_expr::expr::SubqueryId;
15use llkv_result::Error;
16use llkv_types::IntervalValue;
17use llkv_types::decimal::DecimalValue;
18use rustc_hash::FxHashMap;
19
20/// Result type for plan operations.
21pub type PlanResult<T> = llkv_result::Result<T>;
22
23// ============================================================================
24// Filter Metadata
25// ============================================================================
26
27/// Boolean predicate plus correlated subquery metadata attached to a [`SelectPlan`].
28#[derive(Clone, Debug)]
29pub struct SelectFilter {
30    /// Predicate applied to rows before projections execute.
31    pub predicate: llkv_expr::expr::Expr<'static, String>,
32    /// Correlated subqueries required to evaluate the predicate.
33    pub subqueries: Vec<FilterSubquery>,
34}
35
36/// Correlated subquery invoked from within a filter predicate.
37#[derive(Clone, Debug)]
38pub struct FilterSubquery {
39    /// Identifier referenced by [`llkv_expr::expr::Expr::Exists`].
40    pub id: SubqueryId,
41    /// Logical plan for the subquery.
42    pub plan: Box<SelectPlan>,
43    /// Mappings for correlated column placeholders to real outer columns.
44    pub correlated_columns: Vec<CorrelatedColumn>,
45}
46
47/// Correlated subquery invoked from within a scalar projection expression.
48#[derive(Clone, Debug)]
49pub struct ScalarSubquery {
50    /// Identifier referenced by [`llkv_expr::expr::ScalarExpr::ScalarSubquery`].
51    pub id: SubqueryId,
52    /// Logical plan for the subquery.
53    pub plan: Box<SelectPlan>,
54    /// Mappings for correlated column placeholders to real outer columns.
55    pub correlated_columns: Vec<CorrelatedColumn>,
56}
57
58/// Description of a correlated column captured by an EXISTS predicate.
59#[derive(Clone, Debug)]
60pub struct CorrelatedColumn {
61    /// Placeholder column name injected into the subquery expression tree.
62    pub placeholder: String,
63    /// Canonical outer column name.
64    pub column: String,
65    /// Optional nested field path for struct lookups.
66    pub field_path: Vec<String>,
67}
68
69// ============================================================================
70// PlanValue Types
71// ============================================================================
72
73#[derive(Clone, Debug, PartialEq)]
74pub enum PlanValue {
75    Null,
76    Integer(i64),
77    Float(f64),
78    Decimal(DecimalValue),
79    String(String),
80    Date32(i32),
81    Struct(FxHashMap<String, PlanValue>),
82    Interval(IntervalValue),
83}
84
85impl From<&str> for PlanValue {
86    fn from(value: &str) -> Self {
87        Self::String(value.to_string())
88    }
89}
90
91impl From<String> for PlanValue {
92    fn from(value: String) -> Self {
93        Self::String(value)
94    }
95}
96
97impl From<i64> for PlanValue {
98    fn from(value: i64) -> Self {
99        Self::Integer(value)
100    }
101}
102
103impl From<f64> for PlanValue {
104    fn from(value: f64) -> Self {
105        Self::Float(value)
106    }
107}
108
109impl From<bool> for PlanValue {
110    fn from(value: bool) -> Self {
111        // Store booleans as integers for compatibility
112        if value {
113            Self::Integer(1)
114        } else {
115            Self::Integer(0)
116        }
117    }
118}
119
120impl From<i32> for PlanValue {
121    fn from(value: i32) -> Self {
122        Self::Integer(value as i64)
123    }
124}
125
126/// Convert a `Literal` from llkv-expr into a `PlanValue`.
127///
128/// This is useful for evaluating predicates that contain literal values,
129/// such as in HAVING clauses or filter expressions.
130pub fn plan_value_from_literal(literal: &llkv_expr::Literal) -> PlanResult<PlanValue> {
131    use llkv_expr::Literal;
132
133    match literal {
134        Literal::Null => Ok(PlanValue::Null),
135        Literal::Int128(i) => {
136            // Convert i128 to i64, checking for overflow
137            if *i > i64::MAX as i128 || *i < i64::MIN as i128 {
138                Err(Error::InvalidArgumentError(format!(
139                    "Integer literal {} out of range for i64",
140                    i
141                )))
142            } else {
143                Ok(PlanValue::Integer(*i as i64))
144            }
145        }
146        Literal::Float64(f) => Ok(PlanValue::Float(*f)),
147        Literal::Decimal128(decimal) => Ok(PlanValue::Decimal(*decimal)),
148        Literal::String(s) => Ok(PlanValue::String(s.clone())),
149        Literal::Boolean(b) => Ok(PlanValue::from(*b)),
150        Literal::Date32(days) => Ok(PlanValue::Date32(*days)),
151        Literal::Struct(fields) => {
152            let mut map = FxHashMap::with_capacity_and_hasher(fields.len(), Default::default());
153            for (name, value) in fields {
154                let plan_value = plan_value_from_literal(value)?;
155                map.insert(name.clone(), plan_value);
156            }
157            Ok(PlanValue::Struct(map))
158        }
159        Literal::Interval(interval) => Ok(PlanValue::Interval(*interval)),
160    }
161}
162
163// ============================================================================
164// CREATE TABLE Plan
165// ============================================================================
166
167/// Multi-column unique constraint specification.
168#[derive(Clone, Debug)]
169pub struct MultiColumnUniqueSpec {
170    /// Optional name for the unique constraint
171    pub name: Option<String>,
172    /// Column names participating in this UNIQUE constraint
173    pub columns: Vec<String>,
174}
175
176/// Plan for creating a table.
177#[derive(Clone, Debug)]
178pub struct CreateTablePlan {
179    pub name: String,
180    pub if_not_exists: bool,
181    pub or_replace: bool,
182    pub columns: Vec<PlanColumnSpec>,
183    pub source: Option<CreateTableSource>,
184    /// Optional storage namespace for the table.
185    pub namespace: Option<String>,
186    pub foreign_keys: Vec<ForeignKeySpec>,
187    pub multi_column_uniques: Vec<MultiColumnUniqueSpec>,
188}
189
190impl CreateTablePlan {
191    pub fn new(name: impl Into<String>) -> Self {
192        Self {
193            name: name.into(),
194            if_not_exists: false,
195            or_replace: false,
196            columns: Vec::new(),
197            source: None,
198            namespace: None,
199            foreign_keys: Vec::new(),
200            multi_column_uniques: Vec::new(),
201        }
202    }
203}
204
205// ============================================================================
206// DROP TABLE Plan
207// ============================================================================
208
209/// Plan for dropping a table.
210#[derive(Clone, Debug)]
211pub struct DropTablePlan {
212    pub name: String,
213    pub if_exists: bool,
214}
215
216impl DropTablePlan {
217    pub fn new(name: impl Into<String>) -> Self {
218        Self {
219            name: name.into(),
220            if_exists: false,
221        }
222    }
223
224    pub fn if_exists(mut self, if_exists: bool) -> Self {
225        self.if_exists = if_exists;
226        self
227    }
228}
229
230// ============================================================================
231// CREATE VIEW Plan
232// ============================================================================
233
234/// Plan for creating a view.
235#[derive(Clone, Debug)]
236pub struct CreateViewPlan {
237    pub name: String,
238    pub if_not_exists: bool,
239    pub view_definition: String,
240    pub select_plan: Box<SelectPlan>,
241    /// Optional storage namespace for the view (e.g., "temp" for temporary views).
242    pub namespace: Option<String>,
243}
244
245impl CreateViewPlan {
246    pub fn new(name: impl Into<String>, view_definition: String, select_plan: SelectPlan) -> Self {
247        Self {
248            name: name.into(),
249            if_not_exists: false,
250            view_definition,
251            select_plan: Box::new(select_plan),
252            namespace: None,
253        }
254    }
255}
256
257// ============================================================================
258// DROP VIEW Plan
259// ============================================================================
260
261/// Plan for dropping a view.
262#[derive(Clone, Debug)]
263pub struct DropViewPlan {
264    pub name: String,
265    pub if_exists: bool,
266}
267
268impl DropViewPlan {
269    pub fn new(name: impl Into<String>) -> Self {
270        Self {
271            name: name.into(),
272            if_exists: false,
273        }
274    }
275
276    pub fn if_exists(mut self, if_exists: bool) -> Self {
277        self.if_exists = if_exists;
278        self
279    }
280}
281
282// ============================================================================
283// RENAME TABLE Plan
284// ============================================================================
285
286/// Plan for renaming a table.
287#[derive(Clone, Debug, PartialEq, Eq)]
288pub struct RenameTablePlan {
289    pub current_name: String,
290    pub new_name: String,
291    pub if_exists: bool,
292}
293
294impl RenameTablePlan {
295    pub fn new(current_name: impl Into<String>, new_name: impl Into<String>) -> Self {
296        Self {
297            current_name: current_name.into(),
298            new_name: new_name.into(),
299            if_exists: false,
300        }
301    }
302
303    pub fn if_exists(mut self, if_exists: bool) -> Self {
304        self.if_exists = if_exists;
305        self
306    }
307}
308
309/// Plan for dropping an index.
310#[derive(Clone, Debug, PartialEq)]
311pub struct DropIndexPlan {
312    pub name: String,
313    pub canonical_name: String,
314    pub if_exists: bool,
315}
316
317impl DropIndexPlan {
318    pub fn new(name: impl Into<String>) -> Self {
319        let display = name.into();
320        Self {
321            canonical_name: display.to_ascii_lowercase(),
322            name: display,
323            if_exists: false,
324        }
325    }
326
327    pub fn with_canonical(mut self, canonical: impl Into<String>) -> Self {
328        self.canonical_name = canonical.into();
329        self
330    }
331
332    pub fn if_exists(mut self, if_exists: bool) -> Self {
333        self.if_exists = if_exists;
334        self
335    }
336}
337
338/// Plan for rebuilding an index.
339#[derive(Clone, Debug, PartialEq)]
340pub struct ReindexPlan {
341    pub name: String,
342    pub canonical_name: String,
343}
344
345impl ReindexPlan {
346    pub fn new(name: impl Into<String>) -> Self {
347        let display = name.into();
348        Self {
349            canonical_name: display.to_ascii_lowercase(),
350            name: display,
351        }
352    }
353
354    pub fn with_canonical(mut self, canonical: impl Into<String>) -> Self {
355        self.canonical_name = canonical.into();
356        self
357    }
358}
359
360// ============================================================================
361// ALTER TABLE Plan Structures
362// ============================================================================
363
364/// Plan for ALTER TABLE operations.
365#[derive(Clone, Debug, PartialEq)]
366pub struct AlterTablePlan {
367    pub table_name: String,
368    pub if_exists: bool,
369    pub operation: AlterTableOperation,
370}
371
372/// Specific ALTER TABLE operation to perform.
373#[derive(Clone, Debug, PartialEq)]
374pub enum AlterTableOperation {
375    /// RENAME COLUMN old_name TO new_name
376    RenameColumn {
377        old_column_name: String,
378        new_column_name: String,
379    },
380    /// ALTER COLUMN column_name SET DATA TYPE new_type
381    SetColumnDataType {
382        column_name: String,
383        new_data_type: String, // SQL type string like "INTEGER", "VARCHAR", etc.
384    },
385    /// DROP COLUMN column_name
386    DropColumn {
387        column_name: String,
388        if_exists: bool,
389        cascade: bool,
390    },
391}
392
393impl AlterTablePlan {
394    pub fn new(table_name: impl Into<String>, operation: AlterTableOperation) -> Self {
395        Self {
396            table_name: table_name.into(),
397            if_exists: false,
398            operation,
399        }
400    }
401
402    pub fn if_exists(mut self, if_exists: bool) -> Self {
403        self.if_exists = if_exists;
404        self
405    }
406}
407
408// ============================================================================
409// FOREIGN KEY Plan Structures
410// ============================================================================
411
412#[derive(Clone, Debug, Default, PartialEq, Eq)]
413pub enum ForeignKeyAction {
414    #[default]
415    NoAction,
416    Restrict,
417}
418
419#[derive(Clone, Debug)]
420pub struct ForeignKeySpec {
421    pub name: Option<String>,
422    pub columns: Vec<String>,
423    pub referenced_table: String,
424    pub referenced_columns: Vec<String>,
425    pub on_delete: ForeignKeyAction,
426    pub on_update: ForeignKeyAction,
427}
428
429// ============================================================================
430// CREATE INDEX Plan
431// ============================================================================
432
433/// Column specification for CREATE INDEX statements.
434#[derive(Clone, Debug, PartialEq, Eq)]
435pub struct IndexColumnPlan {
436    pub name: String,
437    pub ascending: bool,
438    pub nulls_first: bool,
439}
440
441impl IndexColumnPlan {
442    pub fn new(name: impl Into<String>) -> Self {
443        Self {
444            name: name.into(),
445            ascending: true,
446            nulls_first: false,
447        }
448    }
449
450    pub fn with_sort(mut self, ascending: bool, nulls_first: bool) -> Self {
451        self.ascending = ascending;
452        self.nulls_first = nulls_first;
453        self
454    }
455}
456
457/// Plan for creating an index on a table.
458#[derive(Clone, Debug)]
459pub struct CreateIndexPlan {
460    pub name: Option<String>,
461    pub table: String,
462    pub unique: bool,
463    pub if_not_exists: bool,
464    pub columns: Vec<IndexColumnPlan>,
465}
466
467impl CreateIndexPlan {
468    pub fn new(table: impl Into<String>) -> Self {
469        Self {
470            name: None,
471            table: table.into(),
472            unique: false,
473            if_not_exists: false,
474            columns: Vec::new(),
475        }
476    }
477
478    pub fn with_name(mut self, name: Option<String>) -> Self {
479        self.name = name;
480        self
481    }
482
483    pub fn with_unique(mut self, unique: bool) -> Self {
484        self.unique = unique;
485        self
486    }
487
488    pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self {
489        self.if_not_exists = if_not_exists;
490        self
491    }
492
493    pub fn with_columns(mut self, columns: Vec<IndexColumnPlan>) -> Self {
494        self.columns = columns;
495        self
496    }
497}
498
499/// Column specification produced by the logical planner.
500///
501/// This struct flows from the planner into the runtime/executor so callers can
502/// reason about column metadata without duplicating field definitions.
503#[derive(Clone, Debug)]
504pub struct PlanColumnSpec {
505    pub name: String,
506    pub data_type: DataType,
507    pub nullable: bool,
508    pub primary_key: bool,
509    pub unique: bool,
510    /// Optional CHECK constraint expression (SQL string).
511    /// Example: "t.t=42" for CHECK(t.t=42)
512    pub check_expr: Option<String>,
513}
514
515impl PlanColumnSpec {
516    pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
517        Self {
518            name: name.into(),
519            data_type,
520            nullable,
521            primary_key: false,
522            unique: false,
523            check_expr: None,
524        }
525    }
526
527    pub fn with_primary_key(mut self, primary_key: bool) -> Self {
528        self.primary_key = primary_key;
529        if primary_key {
530            self.unique = true;
531        }
532        self
533    }
534
535    pub fn with_unique(mut self, unique: bool) -> Self {
536        if unique {
537            self.unique = true;
538        }
539        self
540    }
541
542    pub fn with_check(mut self, check_expr: Option<String>) -> Self {
543        self.check_expr = check_expr;
544        self
545    }
546}
547
548/// Trait for types that can be converted into a [`PlanColumnSpec`].
549pub trait IntoPlanColumnSpec {
550    fn into_plan_column_spec(self) -> PlanColumnSpec;
551}
552
553/// Column nullability specification.
554#[derive(Clone, Copy, Debug, PartialEq, Eq)]
555pub enum ColumnNullability {
556    Nullable,
557    NotNull,
558}
559
560impl ColumnNullability {
561    pub fn is_nullable(self) -> bool {
562        matches!(self, ColumnNullability::Nullable)
563    }
564}
565
566/// Convenience constant for nullable columns.
567#[allow(non_upper_case_globals)]
568pub const Nullable: ColumnNullability = ColumnNullability::Nullable;
569
570/// Convenience constant for non-null columns.
571#[allow(non_upper_case_globals)]
572pub const NotNull: ColumnNullability = ColumnNullability::NotNull;
573
574impl IntoPlanColumnSpec for PlanColumnSpec {
575    fn into_plan_column_spec(self) -> PlanColumnSpec {
576        self
577    }
578}
579
580impl<T> IntoPlanColumnSpec for &T
581where
582    T: Clone + IntoPlanColumnSpec,
583{
584    fn into_plan_column_spec(self) -> PlanColumnSpec {
585        self.clone().into_plan_column_spec()
586    }
587}
588
589impl IntoPlanColumnSpec for (&str, DataType) {
590    fn into_plan_column_spec(self) -> PlanColumnSpec {
591        PlanColumnSpec::new(self.0, self.1, true)
592    }
593}
594
595impl IntoPlanColumnSpec for (&str, DataType, bool) {
596    fn into_plan_column_spec(self) -> PlanColumnSpec {
597        PlanColumnSpec::new(self.0, self.1, self.2)
598    }
599}
600
601impl IntoPlanColumnSpec for (&str, DataType, ColumnNullability) {
602    fn into_plan_column_spec(self) -> PlanColumnSpec {
603        PlanColumnSpec::new(self.0, self.1, self.2.is_nullable())
604    }
605}
606
607/// Source data for CREATE TABLE AS SELECT.
608#[derive(Clone, Debug)]
609pub enum CreateTableSource {
610    Batches {
611        schema: Arc<Schema>,
612        batches: Vec<RecordBatch>,
613    },
614    Select {
615        plan: Box<SelectPlan>,
616    },
617}
618
619// ============================================================================
620// INSERT Plan
621// ============================================================================
622
623/// SQLite conflict resolution action for INSERT statements.
624#[derive(Clone, Copy, Debug, PartialEq, Eq)]
625pub enum InsertConflictAction {
626    /// Standard INSERT behavior - fail on constraint violation
627    None,
628    /// INSERT OR REPLACE - update existing row on conflict
629    Replace,
630    /// INSERT OR IGNORE - skip row on conflict
631    Ignore,
632    /// INSERT OR ABORT - abort transaction on conflict
633    Abort,
634    /// INSERT OR FAIL - fail statement on conflict (but don't rollback)
635    Fail,
636    /// INSERT OR ROLLBACK - rollback transaction on conflict
637    Rollback,
638}
639
640/// Plan for inserting data into a table.
641#[derive(Clone, Debug)]
642pub struct InsertPlan {
643    pub table: String,
644    pub columns: Vec<String>,
645    pub source: InsertSource,
646    pub on_conflict: InsertConflictAction,
647}
648
649/// Source data for INSERT operations.
650#[derive(Clone, Debug)]
651pub enum InsertSource {
652    Rows(Vec<Vec<PlanValue>>),
653    Batches(Vec<RecordBatch>),
654    Select { plan: Box<SelectPlan> },
655}
656
657// ============================================================================
658// UPDATE Plan
659// ============================================================================
660
661/// Plan for updating rows in a table.
662#[derive(Clone, Debug)]
663pub struct UpdatePlan {
664    pub table: String,
665    pub assignments: Vec<ColumnAssignment>,
666    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
667}
668
669/// Value to assign in an UPDATE.
670#[derive(Clone, Debug)]
671pub enum AssignmentValue {
672    Literal(PlanValue),
673    Expression(llkv_expr::expr::ScalarExpr<String>),
674}
675
676/// Column assignment for UPDATE.
677#[derive(Clone, Debug)]
678pub struct ColumnAssignment {
679    pub column: String,
680    pub value: AssignmentValue,
681}
682
683// ============================================================================
684// DELETE Plan
685// ============================================================================
686
687/// Plan for deleting rows from a table.
688#[derive(Clone, Debug)]
689pub struct DeletePlan {
690    pub table: String,
691    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
692}
693
694// ============================================================================
695// TRUNCATE Plan
696// ============================================================================
697
698/// Plan for TRUNCATE TABLE operation (removes all rows).
699#[derive(Clone, Debug)]
700pub struct TruncatePlan {
701    pub table: String,
702}
703
704// ============================================================================
705// SELECT Plan
706// ============================================================================
707
708/// Table reference in FROM clause.
709#[derive(Clone, Debug)]
710pub struct TableRef {
711    pub schema: String,
712    pub table: String,
713    pub alias: Option<String>,
714}
715
716impl TableRef {
717    pub fn new(schema: impl Into<String>, table: impl Into<String>) -> Self {
718        Self {
719            schema: schema.into(),
720            table: table.into(),
721            alias: None,
722        }
723    }
724
725    pub fn with_alias(
726        schema: impl Into<String>,
727        table: impl Into<String>,
728        alias: Option<String>,
729    ) -> Self {
730        Self {
731            schema: schema.into(),
732            table: table.into(),
733            alias,
734        }
735    }
736
737    /// Preferred display name for the table (alias if present).
738    pub fn display_name(&self) -> String {
739        self.alias
740            .as_ref()
741            .cloned()
742            .unwrap_or_else(|| self.qualified_name())
743    }
744
745    pub fn qualified_name(&self) -> String {
746        if self.schema.is_empty() {
747            self.table.clone()
748        } else {
749            format!("{}.{}", self.schema, self.table)
750        }
751    }
752}
753
754// ============================================================================
755// Join Metadata
756// ============================================================================
757
758/// Type of join operation for query planning.
759///
760/// This is a plan-layer type that mirrors `llkv_join::JoinType` but exists
761/// separately to avoid circular dependencies (llkv-join depends on llkv-table
762/// which depends on llkv-plan). The executor converts `JoinPlan` to `llkv_join::JoinType`.
763#[derive(Clone, Copy, Debug, PartialEq, Eq)]
764pub enum JoinPlan {
765    /// Emit only matching row pairs.
766    Inner,
767    /// Emit all left rows; unmatched left rows have NULL right columns.
768    Left,
769    /// Emit all right rows; unmatched right rows have NULL left columns.
770    Right,
771    /// Emit all rows from both sides; unmatched rows have NULLs.
772    Full,
773}
774
775/// Metadata describing a join between consecutive tables in the FROM clause.
776///
777/// Tracks the join type and optional ON condition filter for each join.
778/// The join connects table at index `left_table_index` with `left_table_index + 1`.
779/// Replaces the older `join_types`/`join_filters` vectors so executors can
780/// inspect a single compact structure when coordinating join evaluation.
781#[derive(Clone, Debug)]
782pub struct JoinMetadata {
783    /// Index of the left table in the `SelectPlan.tables` vector.
784    pub left_table_index: usize,
785    /// Type of join (INNER, LEFT, RIGHT, etc.).
786    pub join_type: JoinPlan,
787    /// Optional ON condition filter expression. Translators also thread this
788    /// predicate through [`SelectPlan::filter`] so the optimizer can merge it
789    /// with other WHERE clauses, but keeping it here enables join-specific
790    /// rewrites (e.g., push-down or hash join pruning).
791    pub on_condition: Option<llkv_expr::expr::Expr<'static, String>>,
792}
793
794/// Logical query plan for SELECT operations.
795///
796/// The `tables` collection preserves the FROM clause order while [`Self::joins`]
797/// captures how adjacent tables are connected via [`JoinMetadata`]. This keeps
798/// join semantics alongside table references instead of parallel vectors and
799/// mirrors what the executor expects when materialising join pipelines.
800#[derive(Clone, Debug)]
801pub struct SelectPlan {
802    /// Tables to query. Empty vec means no FROM clause (e.g., SELECT 42).
803    /// Single element for simple queries, multiple for joins/cross products.
804    pub tables: Vec<TableRef>,
805    /// Join metadata describing how tables are joined.
806    /// If empty, all tables are implicitly cross-joined (Cartesian product).
807    /// Each entry describes a join between `tables[i]` and `tables[i + 1]`.
808    pub joins: Vec<JoinMetadata>,
809    pub projections: Vec<SelectProjection>,
810    /// Optional WHERE predicate plus dependent correlated subqueries.
811    pub filter: Option<SelectFilter>,
812    /// Optional HAVING predicate applied after grouping.
813    pub having: Option<llkv_expr::expr::Expr<'static, String>>,
814    /// Scalar subqueries referenced by projections, keyed by `SubqueryId`.
815    pub scalar_subqueries: Vec<ScalarSubquery>,
816    pub aggregates: Vec<AggregateExpr>,
817    pub order_by: Vec<OrderByPlan>,
818    pub distinct: bool,
819    /// Optional compound (set-operation) plan.
820    pub compound: Option<CompoundSelectPlan>,
821    /// Columns used in GROUP BY clauses (canonical names).
822    pub group_by: Vec<String>,
823    /// Optional value table output mode (BigQuery style).
824    pub value_table_mode: Option<ValueTableMode>,
825    /// Optional LIMIT count.
826    pub limit: Option<usize>,
827    /// Optional OFFSET count.
828    pub offset: Option<usize>,
829}
830
831impl SelectPlan {
832    /// Create a SelectPlan for a single table.
833    pub fn new(table: impl Into<String>) -> Self {
834        let table_name = table.into();
835        let tables = if table_name.is_empty() {
836            Vec::new()
837        } else {
838            // Parse "schema.table" or just "table"
839            let parts: Vec<&str> = table_name.split('.').collect();
840            if parts.len() >= 2 {
841                let table_part = parts[1..].join(".");
842                vec![TableRef::new(parts[0], table_part)]
843            } else {
844                vec![TableRef::new("", table_name)]
845            }
846        };
847
848        Self {
849            tables,
850            joins: Vec::new(),
851            projections: Vec::new(),
852            filter: None,
853            having: None,
854            scalar_subqueries: Vec::new(),
855            aggregates: Vec::new(),
856            order_by: Vec::new(),
857            distinct: false,
858            compound: None,
859            group_by: Vec::new(),
860            value_table_mode: None,
861            limit: None,
862            offset: None,
863        }
864    }
865
866    /// Create a SelectPlan with multiple tables for cross product/joins.
867    ///
868    /// The returned plan leaves [`Self::joins`] empty, which means any
869    /// evaluation engine should treat the tables as a Cartesian product until
870    /// [`Self::with_joins`] populates concrete join relationships.
871    pub fn with_tables(tables: Vec<TableRef>) -> Self {
872        Self {
873            tables,
874            joins: Vec::new(),
875            projections: Vec::new(),
876            filter: None,
877            having: None,
878            scalar_subqueries: Vec::new(),
879            aggregates: Vec::new(),
880            order_by: Vec::new(),
881            distinct: false,
882            compound: None,
883            group_by: Vec::new(),
884            value_table_mode: None,
885            limit: None,
886            offset: None,
887        }
888    }
889
890    pub fn with_projections(mut self, projections: Vec<SelectProjection>) -> Self {
891        self.projections = projections;
892        self
893    }
894
895    pub fn with_filter(mut self, filter: Option<SelectFilter>) -> Self {
896        self.filter = filter;
897        self
898    }
899
900    pub fn with_having(mut self, having: Option<llkv_expr::expr::Expr<'static, String>>) -> Self {
901        self.having = having;
902        self
903    }
904
905    /// Attach scalar subqueries discovered during SELECT translation.
906    pub fn with_scalar_subqueries(mut self, scalar_subqueries: Vec<ScalarSubquery>) -> Self {
907        self.scalar_subqueries = scalar_subqueries;
908        self
909    }
910
911    pub fn with_aggregates(mut self, aggregates: Vec<AggregateExpr>) -> Self {
912        self.aggregates = aggregates;
913        self
914    }
915
916    pub fn with_order_by(mut self, order_by: Vec<OrderByPlan>) -> Self {
917        self.order_by = order_by;
918        self
919    }
920
921    pub fn with_distinct(mut self, distinct: bool) -> Self {
922        self.distinct = distinct;
923        self
924    }
925
926    /// Attach join metadata describing how tables are connected.
927    ///
928    /// Each [`JoinMetadata`] entry pairs `tables[i]` with `tables[i + 1]`. The
929    /// builder should supply exactly `tables.len().saturating_sub(1)` entries
930    /// when explicit joins are required; otherwise consumers fall back to a
931    /// Cartesian product.
932    pub fn with_joins(mut self, joins: Vec<JoinMetadata>) -> Self {
933        self.joins = joins;
934        self
935    }
936
937    /// Attach a compound (set operation) plan.
938    pub fn with_compound(mut self, compound: CompoundSelectPlan) -> Self {
939        self.compound = Some(compound);
940        self
941    }
942
943    pub fn with_group_by(mut self, group_by: Vec<String>) -> Self {
944        self.group_by = group_by;
945        self
946    }
947
948    pub fn with_value_table_mode(mut self, mode: Option<ValueTableMode>) -> Self {
949        self.value_table_mode = mode;
950        self
951    }
952}
953
954/// Set operation applied between SELECT statements.
955#[derive(Clone, Debug, PartialEq, Eq)]
956pub enum CompoundOperator {
957    Union,
958    Intersect,
959    Except,
960}
961
962/// Quantifier associated with set operations (e.g., UNION vs UNION ALL).
963#[derive(Clone, Debug, PartialEq, Eq)]
964pub enum CompoundQuantifier {
965    Distinct,
966    All,
967}
968
969/// Component of a compound SELECT (set operation).
970#[derive(Clone, Debug)]
971pub struct CompoundSelectComponent {
972    pub operator: CompoundOperator,
973    pub quantifier: CompoundQuantifier,
974    pub plan: SelectPlan,
975}
976
977/// Compound SELECT plan representing a tree of set operations.
978#[derive(Clone, Debug)]
979pub struct CompoundSelectPlan {
980    pub initial: Box<SelectPlan>,
981    pub operations: Vec<CompoundSelectComponent>,
982}
983
984impl CompoundSelectPlan {
985    pub fn new(initial: SelectPlan) -> Self {
986        Self {
987            initial: Box::new(initial),
988            operations: Vec::new(),
989        }
990    }
991
992    pub fn push_operation(
993        &mut self,
994        operator: CompoundOperator,
995        quantifier: CompoundQuantifier,
996        plan: SelectPlan,
997    ) {
998        self.operations.push(CompoundSelectComponent {
999            operator,
1000            quantifier,
1001            plan,
1002        });
1003    }
1004}
1005
1006/// Projection specification for SELECT.
1007#[derive(Clone, Debug)]
1008pub enum SelectProjection {
1009    AllColumns,
1010    AllColumnsExcept {
1011        exclude: Vec<String>,
1012    },
1013    Column {
1014        name: String,
1015        alias: Option<String>,
1016    },
1017    Computed {
1018        expr: llkv_expr::expr::ScalarExpr<String>,
1019        alias: String,
1020    },
1021}
1022
1023/// Value table output modes (BigQuery-style).
1024#[derive(Clone, Debug, PartialEq, Eq)]
1025pub enum ValueTableMode {
1026    AsStruct,
1027    AsValue,
1028    DistinctAsStruct,
1029    DistinctAsValue,
1030}
1031
1032// ============================================================================
1033// Aggregate Plans
1034// ============================================================================
1035
1036/// Aggregate expression in SELECT.
1037#[derive(Clone, Debug)]
1038pub enum AggregateExpr {
1039    CountStar {
1040        alias: String,
1041        distinct: bool,
1042    },
1043    Column {
1044        column: String,
1045        alias: String,
1046        function: AggregateFunction,
1047        distinct: bool,
1048    },
1049}
1050
1051/// Supported aggregate functions.
1052#[derive(Clone, Debug)]
1053pub enum AggregateFunction {
1054    Count,
1055    SumInt64,
1056    TotalInt64,
1057    MinInt64,
1058    MaxInt64,
1059    CountNulls,
1060    GroupConcat,
1061}
1062
1063impl AggregateExpr {
1064    pub fn count_star(alias: impl Into<String>, distinct: bool) -> Self {
1065        Self::CountStar {
1066            alias: alias.into(),
1067            distinct,
1068        }
1069    }
1070
1071    pub fn count_column(
1072        column: impl Into<String>,
1073        alias: impl Into<String>,
1074        distinct: bool,
1075    ) -> Self {
1076        Self::Column {
1077            column: column.into(),
1078            alias: alias.into(),
1079            function: AggregateFunction::Count,
1080            distinct,
1081        }
1082    }
1083
1084    pub fn sum_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
1085        Self::Column {
1086            column: column.into(),
1087            alias: alias.into(),
1088            function: AggregateFunction::SumInt64,
1089            distinct: false,
1090        }
1091    }
1092
1093    pub fn total_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
1094        Self::Column {
1095            column: column.into(),
1096            alias: alias.into(),
1097            function: AggregateFunction::TotalInt64,
1098            distinct: false,
1099        }
1100    }
1101
1102    pub fn min_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
1103        Self::Column {
1104            column: column.into(),
1105            alias: alias.into(),
1106            function: AggregateFunction::MinInt64,
1107            distinct: false,
1108        }
1109    }
1110
1111    pub fn max_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
1112        Self::Column {
1113            column: column.into(),
1114            alias: alias.into(),
1115            function: AggregateFunction::MaxInt64,
1116            distinct: false,
1117        }
1118    }
1119
1120    pub fn count_nulls(column: impl Into<String>, alias: impl Into<String>) -> Self {
1121        Self::Column {
1122            column: column.into(),
1123            alias: alias.into(),
1124            function: AggregateFunction::CountNulls,
1125            distinct: false,
1126        }
1127    }
1128}
1129
1130/// Helper to convert an Arrow array cell into a plan-level Value.
1131pub fn plan_value_from_array(array: &ArrayRef, index: usize) -> PlanResult<PlanValue> {
1132    if array.is_null(index) {
1133        return Ok(PlanValue::Null);
1134    }
1135    match array.data_type() {
1136        DataType::Boolean => {
1137            let values = array
1138                .as_any()
1139                .downcast_ref::<BooleanArray>()
1140                .ok_or_else(|| {
1141                    Error::InvalidArgumentError("expected Boolean array in INSERT SELECT".into())
1142                })?;
1143            Ok(PlanValue::Integer(if values.value(index) { 1 } else { 0 }))
1144        }
1145        DataType::Int64 => {
1146            let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1147                Error::InvalidArgumentError("expected Int64 array in INSERT SELECT".into())
1148            })?;
1149            Ok(PlanValue::Integer(values.value(index)))
1150        }
1151        DataType::Float64 => {
1152            let values = array
1153                .as_any()
1154                .downcast_ref::<Float64Array>()
1155                .ok_or_else(|| {
1156                    Error::InvalidArgumentError("expected Float64 array in INSERT SELECT".into())
1157                })?;
1158            Ok(PlanValue::Float(values.value(index)))
1159        }
1160        DataType::Decimal128(_, scale) => {
1161            let values = array
1162                .as_any()
1163                .downcast_ref::<Decimal128Array>()
1164                .ok_or_else(|| {
1165                    Error::InvalidArgumentError("expected Decimal128 array in INSERT SELECT".into())
1166                })?;
1167            let raw = values.value(index);
1168            let decimal = DecimalValue::new(raw, *scale).map_err(|err| {
1169                Error::InvalidArgumentError(format!(
1170                    "failed to convert Decimal128 value at index {index}: {err}"
1171                ))
1172            })?;
1173            Ok(PlanValue::Decimal(decimal))
1174        }
1175        DataType::Utf8 => {
1176            let values = array
1177                .as_any()
1178                .downcast_ref::<StringArray>()
1179                .ok_or_else(|| {
1180                    Error::InvalidArgumentError("expected Utf8 array in INSERT SELECT".into())
1181                })?;
1182            Ok(PlanValue::String(values.value(index).to_string()))
1183        }
1184        DataType::Date32 => {
1185            let values = array
1186                .as_any()
1187                .downcast_ref::<Date32Array>()
1188                .ok_or_else(|| {
1189                    Error::InvalidArgumentError("expected Date32 array in INSERT SELECT".into())
1190                })?;
1191            Ok(PlanValue::Date32(values.value(index)))
1192        }
1193        other => Err(Error::InvalidArgumentError(format!(
1194            "unsupported data type in INSERT SELECT: {other:?}"
1195        ))),
1196    }
1197}
1198
1199// ============================================================================
1200// ORDER BY Plan
1201// ============================================================================
1202
1203/// ORDER BY specification.
1204#[derive(Clone, Debug)]
1205pub struct OrderByPlan {
1206    pub target: OrderTarget,
1207    pub sort_type: OrderSortType,
1208    pub ascending: bool,
1209    pub nulls_first: bool,
1210}
1211
1212/// Sort type for ORDER BY.
1213#[derive(Clone, Debug)]
1214pub enum OrderSortType {
1215    Native,
1216    CastTextToInteger,
1217}
1218
1219/// Target column/expression for ORDER BY.
1220#[derive(Clone, Debug)]
1221pub enum OrderTarget {
1222    Column(String),
1223    Index(usize),
1224    All,
1225}
1226
1227// ============================================================================
1228// Operation Enum for Transaction Replay
1229// ============================================================================
1230
1231/// Recordable plan operation for transaction replay.
1232#[derive(Clone, Debug)]
1233pub enum PlanOperation {
1234    CreateTable(CreateTablePlan),
1235    DropTable(DropTablePlan),
1236    Insert(InsertPlan),
1237    Update(UpdatePlan),
1238    Delete(DeletePlan),
1239    Truncate(TruncatePlan),
1240    Select(Box<SelectPlan>),
1241}
1242
1243/// Top-level plan statements that can be executed against a `Session`.
1244#[derive(Clone, Debug)]
1245pub enum PlanStatement {
1246    BeginTransaction,
1247    CommitTransaction,
1248    RollbackTransaction,
1249    CreateTable(CreateTablePlan),
1250    DropTable(DropTablePlan),
1251    CreateView(CreateViewPlan),
1252    DropView(DropViewPlan),
1253    DropIndex(DropIndexPlan),
1254    AlterTable(AlterTablePlan),
1255    CreateIndex(CreateIndexPlan),
1256    Reindex(ReindexPlan),
1257    Insert(InsertPlan),
1258    Update(UpdatePlan),
1259    Delete(DeletePlan),
1260    Truncate(TruncatePlan),
1261    Select(Box<SelectPlan>),
1262}