llkv_plan/
plans.rs

1//! Logical query plan structures for LLKV.
2//!
3//! This module defines the plan structures that represent logical query operations
4//! before they are executed. Plans are created by SQL parsers or fluent builders and
5//! consumed by execution engines.
6
7use std::sync::Arc;
8
9use arrow::array::{ArrayRef, BooleanArray, Date32Array, Float64Array, Int64Array, StringArray};
10use arrow::datatypes::{DataType, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_expr::expr::SubqueryId;
13use llkv_result::Error;
14use rustc_hash::FxHashMap;
15
16/// Result type for plan operations.
17pub type PlanResult<T> = llkv_result::Result<T>;
18
19// ============================================================================
20// Filter Metadata
21// ============================================================================
22
23/// Boolean predicate plus correlated subquery metadata attached to a [`SelectPlan`].
24#[derive(Clone, Debug)]
25pub struct SelectFilter {
26    /// Predicate applied to rows before projections execute.
27    pub predicate: llkv_expr::expr::Expr<'static, String>,
28    /// Correlated subqueries required to evaluate the predicate.
29    pub subqueries: Vec<FilterSubquery>,
30}
31
32/// Correlated subquery invoked from within a filter predicate.
33#[derive(Clone, Debug)]
34pub struct FilterSubquery {
35    /// Identifier referenced by [`llkv_expr::expr::Expr::Exists`].
36    pub id: SubqueryId,
37    /// Logical plan for the subquery.
38    pub plan: Box<SelectPlan>,
39    /// Mappings for correlated column placeholders to real outer columns.
40    pub correlated_columns: Vec<CorrelatedColumn>,
41}
42
43/// Correlated subquery invoked from within a scalar projection expression.
44#[derive(Clone, Debug)]
45pub struct ScalarSubquery {
46    /// Identifier referenced by [`llkv_expr::expr::ScalarExpr::ScalarSubquery`].
47    pub id: SubqueryId,
48    /// Logical plan for the subquery.
49    pub plan: Box<SelectPlan>,
50    /// Mappings for correlated column placeholders to real outer columns.
51    pub correlated_columns: Vec<CorrelatedColumn>,
52}
53
54/// Description of a correlated column captured by an EXISTS predicate.
55#[derive(Clone, Debug)]
56pub struct CorrelatedColumn {
57    /// Placeholder column name injected into the subquery expression tree.
58    pub placeholder: String,
59    /// Canonical outer column name.
60    pub column: String,
61    /// Optional nested field path for struct lookups.
62    pub field_path: Vec<String>,
63}
64
65// ============================================================================
66// PlanValue Types
67// ============================================================================
68
69#[derive(Clone, Debug, PartialEq)]
70pub enum PlanValue {
71    Null,
72    Integer(i64),
73    Float(f64),
74    String(String),
75    Struct(FxHashMap<String, PlanValue>),
76}
77
78impl From<&str> for PlanValue {
79    fn from(value: &str) -> Self {
80        Self::String(value.to_string())
81    }
82}
83
84impl From<String> for PlanValue {
85    fn from(value: String) -> Self {
86        Self::String(value)
87    }
88}
89
90impl From<i64> for PlanValue {
91    fn from(value: i64) -> Self {
92        Self::Integer(value)
93    }
94}
95
96impl From<f64> for PlanValue {
97    fn from(value: f64) -> Self {
98        Self::Float(value)
99    }
100}
101
102impl From<bool> for PlanValue {
103    fn from(value: bool) -> Self {
104        // Store booleans as integers for compatibility
105        if value {
106            Self::Integer(1)
107        } else {
108            Self::Integer(0)
109        }
110    }
111}
112
113impl From<i32> for PlanValue {
114    fn from(value: i32) -> Self {
115        Self::Integer(value as i64)
116    }
117}
118
119/// Convert a `Literal` from llkv-expr into a `PlanValue`.
120///
121/// This is useful for evaluating predicates that contain literal values,
122/// such as in HAVING clauses or filter expressions.
123pub fn plan_value_from_literal(literal: &llkv_expr::Literal) -> PlanResult<PlanValue> {
124    use llkv_expr::Literal;
125
126    match literal {
127        Literal::Null => Ok(PlanValue::Null),
128        Literal::Integer(i) => {
129            // Convert i128 to i64, checking for overflow
130            if *i > i64::MAX as i128 || *i < i64::MIN as i128 {
131                Err(Error::InvalidArgumentError(format!(
132                    "Integer literal {} out of range for i64",
133                    i
134                )))
135            } else {
136                Ok(PlanValue::Integer(*i as i64))
137            }
138        }
139        Literal::Float(f) => Ok(PlanValue::Float(*f)),
140        Literal::String(s) => Ok(PlanValue::String(s.clone())),
141        Literal::Boolean(b) => Ok(PlanValue::from(*b)),
142        Literal::Struct(fields) => {
143            let mut map = FxHashMap::with_capacity_and_hasher(fields.len(), Default::default());
144            for (name, value) in fields {
145                let plan_value = plan_value_from_literal(value)?;
146                map.insert(name.clone(), plan_value);
147            }
148            Ok(PlanValue::Struct(map))
149        }
150    }
151}
152
153// ============================================================================
154// CREATE TABLE Plan
155// ============================================================================
156
157/// Multi-column unique constraint specification.
158#[derive(Clone, Debug)]
159pub struct MultiColumnUniqueSpec {
160    /// Optional name for the unique constraint
161    pub name: Option<String>,
162    /// Column names participating in this UNIQUE constraint
163    pub columns: Vec<String>,
164}
165
166/// Plan for creating a table.
167#[derive(Clone, Debug)]
168pub struct CreateTablePlan {
169    pub name: String,
170    pub if_not_exists: bool,
171    pub or_replace: bool,
172    pub columns: Vec<PlanColumnSpec>,
173    pub source: Option<CreateTableSource>,
174    /// Optional storage namespace for the table.
175    pub namespace: Option<String>,
176    pub foreign_keys: Vec<ForeignKeySpec>,
177    pub multi_column_uniques: Vec<MultiColumnUniqueSpec>,
178}
179
180impl CreateTablePlan {
181    pub fn new(name: impl Into<String>) -> Self {
182        Self {
183            name: name.into(),
184            if_not_exists: false,
185            or_replace: false,
186            columns: Vec::new(),
187            source: None,
188            namespace: None,
189            foreign_keys: Vec::new(),
190            multi_column_uniques: Vec::new(),
191        }
192    }
193}
194
195// ============================================================================
196// DROP TABLE Plan
197// ============================================================================
198
199/// Plan for dropping a table.
200#[derive(Clone, Debug)]
201pub struct DropTablePlan {
202    pub name: String,
203    pub if_exists: bool,
204}
205
206impl DropTablePlan {
207    pub fn new(name: impl Into<String>) -> Self {
208        Self {
209            name: name.into(),
210            if_exists: false,
211        }
212    }
213
214    pub fn if_exists(mut self, if_exists: bool) -> Self {
215        self.if_exists = if_exists;
216        self
217    }
218}
219
220// ============================================================================
221// CREATE VIEW Plan
222// ============================================================================
223
224/// Plan for creating a view.
225#[derive(Clone, Debug)]
226pub struct CreateViewPlan {
227    pub name: String,
228    pub if_not_exists: bool,
229    pub view_definition: String,
230    pub select_plan: Box<SelectPlan>,
231    /// Optional storage namespace for the view (e.g., "temp" for temporary views).
232    pub namespace: Option<String>,
233}
234
235impl CreateViewPlan {
236    pub fn new(name: impl Into<String>, view_definition: String, select_plan: SelectPlan) -> Self {
237        Self {
238            name: name.into(),
239            if_not_exists: false,
240            view_definition,
241            select_plan: Box::new(select_plan),
242            namespace: None,
243        }
244    }
245}
246
247// ============================================================================
248// DROP VIEW Plan
249// ============================================================================
250
251/// Plan for dropping a view.
252#[derive(Clone, Debug)]
253pub struct DropViewPlan {
254    pub name: String,
255    pub if_exists: bool,
256}
257
258impl DropViewPlan {
259    pub fn new(name: impl Into<String>) -> Self {
260        Self {
261            name: name.into(),
262            if_exists: false,
263        }
264    }
265
266    pub fn if_exists(mut self, if_exists: bool) -> Self {
267        self.if_exists = if_exists;
268        self
269    }
270}
271
272// ============================================================================
273// RENAME TABLE Plan
274// ============================================================================
275
276/// Plan for renaming a table.
277#[derive(Clone, Debug, PartialEq, Eq)]
278pub struct RenameTablePlan {
279    pub current_name: String,
280    pub new_name: String,
281    pub if_exists: bool,
282}
283
284impl RenameTablePlan {
285    pub fn new(current_name: impl Into<String>, new_name: impl Into<String>) -> Self {
286        Self {
287            current_name: current_name.into(),
288            new_name: new_name.into(),
289            if_exists: false,
290        }
291    }
292
293    pub fn if_exists(mut self, if_exists: bool) -> Self {
294        self.if_exists = if_exists;
295        self
296    }
297}
298
299/// Plan for dropping an index.
300#[derive(Clone, Debug, PartialEq)]
301pub struct DropIndexPlan {
302    pub name: String,
303    pub canonical_name: String,
304    pub if_exists: bool,
305}
306
307impl DropIndexPlan {
308    pub fn new(name: impl Into<String>) -> Self {
309        let display = name.into();
310        Self {
311            canonical_name: display.to_ascii_lowercase(),
312            name: display,
313            if_exists: false,
314        }
315    }
316
317    pub fn with_canonical(mut self, canonical: impl Into<String>) -> Self {
318        self.canonical_name = canonical.into();
319        self
320    }
321
322    pub fn if_exists(mut self, if_exists: bool) -> Self {
323        self.if_exists = if_exists;
324        self
325    }
326}
327
328/// Plan for rebuilding an index.
329#[derive(Clone, Debug, PartialEq)]
330pub struct ReindexPlan {
331    pub name: String,
332    pub canonical_name: String,
333}
334
335impl ReindexPlan {
336    pub fn new(name: impl Into<String>) -> Self {
337        let display = name.into();
338        Self {
339            canonical_name: display.to_ascii_lowercase(),
340            name: display,
341        }
342    }
343
344    pub fn with_canonical(mut self, canonical: impl Into<String>) -> Self {
345        self.canonical_name = canonical.into();
346        self
347    }
348}
349
350// ============================================================================
351// ALTER TABLE Plan Structures
352// ============================================================================
353
354/// Plan for ALTER TABLE operations.
355#[derive(Clone, Debug, PartialEq)]
356pub struct AlterTablePlan {
357    pub table_name: String,
358    pub if_exists: bool,
359    pub operation: AlterTableOperation,
360}
361
362/// Specific ALTER TABLE operation to perform.
363#[derive(Clone, Debug, PartialEq)]
364pub enum AlterTableOperation {
365    /// RENAME COLUMN old_name TO new_name
366    RenameColumn {
367        old_column_name: String,
368        new_column_name: String,
369    },
370    /// ALTER COLUMN column_name SET DATA TYPE new_type
371    SetColumnDataType {
372        column_name: String,
373        new_data_type: String, // SQL type string like "INTEGER", "VARCHAR", etc.
374    },
375    /// DROP COLUMN column_name
376    DropColumn {
377        column_name: String,
378        if_exists: bool,
379        cascade: bool,
380    },
381}
382
383impl AlterTablePlan {
384    pub fn new(table_name: impl Into<String>, operation: AlterTableOperation) -> Self {
385        Self {
386            table_name: table_name.into(),
387            if_exists: false,
388            operation,
389        }
390    }
391
392    pub fn if_exists(mut self, if_exists: bool) -> Self {
393        self.if_exists = if_exists;
394        self
395    }
396}
397
398// ============================================================================
399// FOREIGN KEY Plan Structures
400// ============================================================================
401
402#[derive(Clone, Debug, Default, PartialEq, Eq)]
403pub enum ForeignKeyAction {
404    #[default]
405    NoAction,
406    Restrict,
407}
408
409#[derive(Clone, Debug)]
410pub struct ForeignKeySpec {
411    pub name: Option<String>,
412    pub columns: Vec<String>,
413    pub referenced_table: String,
414    pub referenced_columns: Vec<String>,
415    pub on_delete: ForeignKeyAction,
416    pub on_update: ForeignKeyAction,
417}
418
419// ============================================================================
420// CREATE INDEX Plan
421// ============================================================================
422
423/// Column specification for CREATE INDEX statements.
424#[derive(Clone, Debug, PartialEq, Eq)]
425pub struct IndexColumnPlan {
426    pub name: String,
427    pub ascending: bool,
428    pub nulls_first: bool,
429}
430
431impl IndexColumnPlan {
432    pub fn new(name: impl Into<String>) -> Self {
433        Self {
434            name: name.into(),
435            ascending: true,
436            nulls_first: false,
437        }
438    }
439
440    pub fn with_sort(mut self, ascending: bool, nulls_first: bool) -> Self {
441        self.ascending = ascending;
442        self.nulls_first = nulls_first;
443        self
444    }
445}
446
447/// Plan for creating an index on a table.
448#[derive(Clone, Debug)]
449pub struct CreateIndexPlan {
450    pub name: Option<String>,
451    pub table: String,
452    pub unique: bool,
453    pub if_not_exists: bool,
454    pub columns: Vec<IndexColumnPlan>,
455}
456
457impl CreateIndexPlan {
458    pub fn new(table: impl Into<String>) -> Self {
459        Self {
460            name: None,
461            table: table.into(),
462            unique: false,
463            if_not_exists: false,
464            columns: Vec::new(),
465        }
466    }
467
468    pub fn with_name(mut self, name: Option<String>) -> Self {
469        self.name = name;
470        self
471    }
472
473    pub fn with_unique(mut self, unique: bool) -> Self {
474        self.unique = unique;
475        self
476    }
477
478    pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self {
479        self.if_not_exists = if_not_exists;
480        self
481    }
482
483    pub fn with_columns(mut self, columns: Vec<IndexColumnPlan>) -> Self {
484        self.columns = columns;
485        self
486    }
487}
488
489/// Column specification produced by the logical planner.
490///
491/// This struct flows from the planner into the runtime/executor so callers can
492/// reason about column metadata without duplicating field definitions.
493#[derive(Clone, Debug)]
494pub struct PlanColumnSpec {
495    pub name: String,
496    pub data_type: DataType,
497    pub nullable: bool,
498    pub primary_key: bool,
499    pub unique: bool,
500    /// Optional CHECK constraint expression (SQL string).
501    /// Example: "t.t=42" for CHECK(t.t=42)
502    pub check_expr: Option<String>,
503}
504
505impl PlanColumnSpec {
506    pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
507        Self {
508            name: name.into(),
509            data_type,
510            nullable,
511            primary_key: false,
512            unique: false,
513            check_expr: None,
514        }
515    }
516
517    pub fn with_primary_key(mut self, primary_key: bool) -> Self {
518        self.primary_key = primary_key;
519        if primary_key {
520            self.unique = true;
521        }
522        self
523    }
524
525    pub fn with_unique(mut self, unique: bool) -> Self {
526        if unique {
527            self.unique = true;
528        }
529        self
530    }
531
532    pub fn with_check(mut self, check_expr: Option<String>) -> Self {
533        self.check_expr = check_expr;
534        self
535    }
536}
537
538/// Trait for types that can be converted into a [`PlanColumnSpec`].
539pub trait IntoPlanColumnSpec {
540    fn into_plan_column_spec(self) -> PlanColumnSpec;
541}
542
543/// Column nullability specification.
544#[derive(Clone, Copy, Debug, PartialEq, Eq)]
545pub enum ColumnNullability {
546    Nullable,
547    NotNull,
548}
549
550impl ColumnNullability {
551    pub fn is_nullable(self) -> bool {
552        matches!(self, ColumnNullability::Nullable)
553    }
554}
555
556/// Convenience constant for nullable columns.
557#[allow(non_upper_case_globals)]
558pub const Nullable: ColumnNullability = ColumnNullability::Nullable;
559
560/// Convenience constant for non-null columns.
561#[allow(non_upper_case_globals)]
562pub const NotNull: ColumnNullability = ColumnNullability::NotNull;
563
564impl IntoPlanColumnSpec for PlanColumnSpec {
565    fn into_plan_column_spec(self) -> PlanColumnSpec {
566        self
567    }
568}
569
570impl<T> IntoPlanColumnSpec for &T
571where
572    T: Clone + IntoPlanColumnSpec,
573{
574    fn into_plan_column_spec(self) -> PlanColumnSpec {
575        self.clone().into_plan_column_spec()
576    }
577}
578
579impl IntoPlanColumnSpec for (&str, DataType) {
580    fn into_plan_column_spec(self) -> PlanColumnSpec {
581        PlanColumnSpec::new(self.0, self.1, true)
582    }
583}
584
585impl IntoPlanColumnSpec for (&str, DataType, bool) {
586    fn into_plan_column_spec(self) -> PlanColumnSpec {
587        PlanColumnSpec::new(self.0, self.1, self.2)
588    }
589}
590
591impl IntoPlanColumnSpec for (&str, DataType, ColumnNullability) {
592    fn into_plan_column_spec(self) -> PlanColumnSpec {
593        PlanColumnSpec::new(self.0, self.1, self.2.is_nullable())
594    }
595}
596
597/// Source data for CREATE TABLE AS SELECT.
598#[derive(Clone, Debug)]
599pub enum CreateTableSource {
600    Batches {
601        schema: Arc<Schema>,
602        batches: Vec<RecordBatch>,
603    },
604    Select {
605        plan: Box<SelectPlan>,
606    },
607}
608
609// ============================================================================
610// INSERT Plan
611// ============================================================================
612
613/// SQLite conflict resolution action for INSERT statements.
614#[derive(Clone, Copy, Debug, PartialEq, Eq)]
615pub enum InsertConflictAction {
616    /// Standard INSERT behavior - fail on constraint violation
617    None,
618    /// INSERT OR REPLACE - update existing row on conflict
619    Replace,
620    /// INSERT OR IGNORE - skip row on conflict
621    Ignore,
622    /// INSERT OR ABORT - abort transaction on conflict
623    Abort,
624    /// INSERT OR FAIL - fail statement on conflict (but don't rollback)
625    Fail,
626    /// INSERT OR ROLLBACK - rollback transaction on conflict
627    Rollback,
628}
629
630/// Plan for inserting data into a table.
631#[derive(Clone, Debug)]
632pub struct InsertPlan {
633    pub table: String,
634    pub columns: Vec<String>,
635    pub source: InsertSource,
636    pub on_conflict: InsertConflictAction,
637}
638
639/// Source data for INSERT operations.
640#[derive(Clone, Debug)]
641pub enum InsertSource {
642    Rows(Vec<Vec<PlanValue>>),
643    Batches(Vec<RecordBatch>),
644    Select { plan: Box<SelectPlan> },
645}
646
647// ============================================================================
648// UPDATE Plan
649// ============================================================================
650
651/// Plan for updating rows in a table.
652#[derive(Clone, Debug)]
653pub struct UpdatePlan {
654    pub table: String,
655    pub assignments: Vec<ColumnAssignment>,
656    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
657}
658
659/// Value to assign in an UPDATE.
660#[derive(Clone, Debug)]
661pub enum AssignmentValue {
662    Literal(PlanValue),
663    Expression(llkv_expr::expr::ScalarExpr<String>),
664}
665
666/// Column assignment for UPDATE.
667#[derive(Clone, Debug)]
668pub struct ColumnAssignment {
669    pub column: String,
670    pub value: AssignmentValue,
671}
672
673// ============================================================================
674// DELETE Plan
675// ============================================================================
676
677/// Plan for deleting rows from a table.
678#[derive(Clone, Debug)]
679pub struct DeletePlan {
680    pub table: String,
681    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
682}
683
684// ============================================================================
685// TRUNCATE Plan
686// ============================================================================
687
688/// Plan for TRUNCATE TABLE operation (removes all rows).
689#[derive(Clone, Debug)]
690pub struct TruncatePlan {
691    pub table: String,
692}
693
694// ============================================================================
695// SELECT Plan
696// ============================================================================
697
698/// Table reference in FROM clause.
699#[derive(Clone, Debug)]
700pub struct TableRef {
701    pub schema: String,
702    pub table: String,
703    pub alias: Option<String>,
704}
705
706impl TableRef {
707    pub fn new(schema: impl Into<String>, table: impl Into<String>) -> Self {
708        Self {
709            schema: schema.into(),
710            table: table.into(),
711            alias: None,
712        }
713    }
714
715    pub fn with_alias(
716        schema: impl Into<String>,
717        table: impl Into<String>,
718        alias: Option<String>,
719    ) -> Self {
720        Self {
721            schema: schema.into(),
722            table: table.into(),
723            alias,
724        }
725    }
726
727    /// Preferred display name for the table (alias if present).
728    pub fn display_name(&self) -> String {
729        self.alias
730            .as_ref()
731            .cloned()
732            .unwrap_or_else(|| self.qualified_name())
733    }
734
735    pub fn qualified_name(&self) -> String {
736        if self.schema.is_empty() {
737            self.table.clone()
738        } else {
739            format!("{}.{}", self.schema, self.table)
740        }
741    }
742}
743
744// ============================================================================
745// Join Metadata
746// ============================================================================
747
748/// Type of join operation for query planning.
749///
750/// This is a plan-layer type that mirrors `llkv_join::JoinType` but exists
751/// separately to avoid circular dependencies (llkv-join depends on llkv-table
752/// which depends on llkv-plan). The executor converts `JoinPlan` to `llkv_join::JoinType`.
753#[derive(Clone, Copy, Debug, PartialEq, Eq)]
754pub enum JoinPlan {
755    /// Emit only matching row pairs.
756    Inner,
757    /// Emit all left rows; unmatched left rows have NULL right columns.
758    Left,
759    /// Emit all right rows; unmatched right rows have NULL left columns.
760    Right,
761    /// Emit all rows from both sides; unmatched rows have NULLs.
762    Full,
763}
764
765/// Metadata describing a join between consecutive tables in the FROM clause.
766///
767/// Tracks the join type and optional ON condition filter for each join.
768/// The join connects table at index `left_table_index` with `left_table_index + 1`.
769/// Replaces the older `join_types`/`join_filters` vectors so executors can
770/// inspect a single compact structure when coordinating join evaluation.
771#[derive(Clone, Debug)]
772pub struct JoinMetadata {
773    /// Index of the left table in the `SelectPlan.tables` vector.
774    pub left_table_index: usize,
775    /// Type of join (INNER, LEFT, RIGHT, etc.).
776    pub join_type: JoinPlan,
777    /// Optional ON condition filter expression. Translators also thread this
778    /// predicate through [`SelectPlan::filter`] so the optimizer can merge it
779    /// with other WHERE clauses, but keeping it here enables join-specific
780    /// rewrites (e.g., push-down or hash join pruning).
781    pub on_condition: Option<llkv_expr::expr::Expr<'static, String>>,
782}
783
784/// Logical query plan for SELECT operations.
785///
786/// The `tables` collection preserves the FROM clause order while [`Self::joins`]
787/// captures how adjacent tables are connected via [`JoinMetadata`]. This keeps
788/// join semantics alongside table references instead of parallel vectors and
789/// mirrors what the executor expects when materialising join pipelines.
790#[derive(Clone, Debug)]
791pub struct SelectPlan {
792    /// Tables to query. Empty vec means no FROM clause (e.g., SELECT 42).
793    /// Single element for simple queries, multiple for joins/cross products.
794    pub tables: Vec<TableRef>,
795    /// Join metadata describing how tables are joined.
796    /// If empty, all tables are implicitly cross-joined (Cartesian product).
797    /// Each entry describes a join between `tables[i]` and `tables[i + 1]`.
798    pub joins: Vec<JoinMetadata>,
799    pub projections: Vec<SelectProjection>,
800    /// Optional WHERE predicate plus dependent correlated subqueries.
801    pub filter: Option<SelectFilter>,
802    /// Optional HAVING predicate applied after grouping.
803    pub having: Option<llkv_expr::expr::Expr<'static, String>>,
804    /// Scalar subqueries referenced by projections, keyed by `SubqueryId`.
805    pub scalar_subqueries: Vec<ScalarSubquery>,
806    pub aggregates: Vec<AggregateExpr>,
807    pub order_by: Vec<OrderByPlan>,
808    pub distinct: bool,
809    /// Optional compound (set-operation) plan.
810    pub compound: Option<CompoundSelectPlan>,
811    /// Columns used in GROUP BY clauses (canonical names).
812    pub group_by: Vec<String>,
813    /// Optional value table output mode (BigQuery style).
814    pub value_table_mode: Option<ValueTableMode>,
815}
816
817impl SelectPlan {
818    /// Create a SelectPlan for a single table.
819    pub fn new(table: impl Into<String>) -> Self {
820        let table_name = table.into();
821        let tables = if table_name.is_empty() {
822            Vec::new()
823        } else {
824            // Parse "schema.table" or just "table"
825            let parts: Vec<&str> = table_name.split('.').collect();
826            if parts.len() >= 2 {
827                let table_part = parts[1..].join(".");
828                vec![TableRef::new(parts[0], table_part)]
829            } else {
830                vec![TableRef::new("", table_name)]
831            }
832        };
833
834        Self {
835            tables,
836            joins: Vec::new(),
837            projections: Vec::new(),
838            filter: None,
839            having: None,
840            scalar_subqueries: Vec::new(),
841            aggregates: Vec::new(),
842            order_by: Vec::new(),
843            distinct: false,
844            compound: None,
845            group_by: Vec::new(),
846            value_table_mode: None,
847        }
848    }
849
850    /// Create a SelectPlan with multiple tables for cross product/joins.
851    ///
852    /// The returned plan leaves [`Self::joins`] empty, which means any
853    /// evaluation engine should treat the tables as a Cartesian product until
854    /// [`Self::with_joins`] populates concrete join relationships.
855    pub fn with_tables(tables: Vec<TableRef>) -> Self {
856        Self {
857            tables,
858            joins: Vec::new(),
859            projections: Vec::new(),
860            filter: None,
861            having: None,
862            scalar_subqueries: Vec::new(),
863            aggregates: Vec::new(),
864            order_by: Vec::new(),
865            distinct: false,
866            compound: None,
867            group_by: Vec::new(),
868            value_table_mode: None,
869        }
870    }
871
872    pub fn with_projections(mut self, projections: Vec<SelectProjection>) -> Self {
873        self.projections = projections;
874        self
875    }
876
877    pub fn with_filter(mut self, filter: Option<SelectFilter>) -> Self {
878        self.filter = filter;
879        self
880    }
881
882    pub fn with_having(mut self, having: Option<llkv_expr::expr::Expr<'static, String>>) -> Self {
883        self.having = having;
884        self
885    }
886
887    /// Attach scalar subqueries discovered during SELECT translation.
888    pub fn with_scalar_subqueries(mut self, scalar_subqueries: Vec<ScalarSubquery>) -> Self {
889        self.scalar_subqueries = scalar_subqueries;
890        self
891    }
892
893    pub fn with_aggregates(mut self, aggregates: Vec<AggregateExpr>) -> Self {
894        self.aggregates = aggregates;
895        self
896    }
897
898    pub fn with_order_by(mut self, order_by: Vec<OrderByPlan>) -> Self {
899        self.order_by = order_by;
900        self
901    }
902
903    pub fn with_distinct(mut self, distinct: bool) -> Self {
904        self.distinct = distinct;
905        self
906    }
907
908    /// Attach join metadata describing how tables are connected.
909    ///
910    /// Each [`JoinMetadata`] entry pairs `tables[i]` with `tables[i + 1]`. The
911    /// builder should supply exactly `tables.len().saturating_sub(1)` entries
912    /// when explicit joins are required; otherwise consumers fall back to a
913    /// Cartesian product.
914    pub fn with_joins(mut self, joins: Vec<JoinMetadata>) -> Self {
915        self.joins = joins;
916        self
917    }
918
919    /// Attach a compound (set operation) plan.
920    pub fn with_compound(mut self, compound: CompoundSelectPlan) -> Self {
921        self.compound = Some(compound);
922        self
923    }
924
925    pub fn with_group_by(mut self, group_by: Vec<String>) -> Self {
926        self.group_by = group_by;
927        self
928    }
929
930    pub fn with_value_table_mode(mut self, mode: Option<ValueTableMode>) -> Self {
931        self.value_table_mode = mode;
932        self
933    }
934}
935
936/// Set operation applied between SELECT statements.
937#[derive(Clone, Debug, PartialEq, Eq)]
938pub enum CompoundOperator {
939    Union,
940    Intersect,
941    Except,
942}
943
944/// Quantifier associated with set operations (e.g., UNION vs UNION ALL).
945#[derive(Clone, Debug, PartialEq, Eq)]
946pub enum CompoundQuantifier {
947    Distinct,
948    All,
949}
950
951/// Component of a compound SELECT (set operation).
952#[derive(Clone, Debug)]
953pub struct CompoundSelectComponent {
954    pub operator: CompoundOperator,
955    pub quantifier: CompoundQuantifier,
956    pub plan: SelectPlan,
957}
958
959/// Compound SELECT plan representing a tree of set operations.
960#[derive(Clone, Debug)]
961pub struct CompoundSelectPlan {
962    pub initial: Box<SelectPlan>,
963    pub operations: Vec<CompoundSelectComponent>,
964}
965
966impl CompoundSelectPlan {
967    pub fn new(initial: SelectPlan) -> Self {
968        Self {
969            initial: Box::new(initial),
970            operations: Vec::new(),
971        }
972    }
973
974    pub fn push_operation(
975        &mut self,
976        operator: CompoundOperator,
977        quantifier: CompoundQuantifier,
978        plan: SelectPlan,
979    ) {
980        self.operations.push(CompoundSelectComponent {
981            operator,
982            quantifier,
983            plan,
984        });
985    }
986}
987
988/// Projection specification for SELECT.
989#[derive(Clone, Debug)]
990pub enum SelectProjection {
991    AllColumns,
992    AllColumnsExcept {
993        exclude: Vec<String>,
994    },
995    Column {
996        name: String,
997        alias: Option<String>,
998    },
999    Computed {
1000        expr: llkv_expr::expr::ScalarExpr<String>,
1001        alias: String,
1002    },
1003}
1004
1005/// Value table output modes (BigQuery-style).
1006#[derive(Clone, Debug, PartialEq, Eq)]
1007pub enum ValueTableMode {
1008    AsStruct,
1009    AsValue,
1010    DistinctAsStruct,
1011    DistinctAsValue,
1012}
1013
1014// ============================================================================
1015// Aggregate Plans
1016// ============================================================================
1017
1018/// Aggregate expression in SELECT.
1019#[derive(Clone, Debug)]
1020pub enum AggregateExpr {
1021    CountStar {
1022        alias: String,
1023        distinct: bool,
1024    },
1025    Column {
1026        column: String,
1027        alias: String,
1028        function: AggregateFunction,
1029        distinct: bool,
1030    },
1031}
1032
1033/// Supported aggregate functions.
1034#[derive(Clone, Debug)]
1035pub enum AggregateFunction {
1036    Count,
1037    SumInt64,
1038    TotalInt64,
1039    MinInt64,
1040    MaxInt64,
1041    CountNulls,
1042    GroupConcat,
1043}
1044
1045impl AggregateExpr {
1046    pub fn count_star(alias: impl Into<String>, distinct: bool) -> Self {
1047        Self::CountStar {
1048            alias: alias.into(),
1049            distinct,
1050        }
1051    }
1052
1053    pub fn count_column(
1054        column: impl Into<String>,
1055        alias: impl Into<String>,
1056        distinct: bool,
1057    ) -> Self {
1058        Self::Column {
1059            column: column.into(),
1060            alias: alias.into(),
1061            function: AggregateFunction::Count,
1062            distinct,
1063        }
1064    }
1065
1066    pub fn sum_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
1067        Self::Column {
1068            column: column.into(),
1069            alias: alias.into(),
1070            function: AggregateFunction::SumInt64,
1071            distinct: false,
1072        }
1073    }
1074
1075    pub fn total_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
1076        Self::Column {
1077            column: column.into(),
1078            alias: alias.into(),
1079            function: AggregateFunction::TotalInt64,
1080            distinct: false,
1081        }
1082    }
1083
1084    pub fn min_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
1085        Self::Column {
1086            column: column.into(),
1087            alias: alias.into(),
1088            function: AggregateFunction::MinInt64,
1089            distinct: false,
1090        }
1091    }
1092
1093    pub fn max_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
1094        Self::Column {
1095            column: column.into(),
1096            alias: alias.into(),
1097            function: AggregateFunction::MaxInt64,
1098            distinct: false,
1099        }
1100    }
1101
1102    pub fn count_nulls(column: impl Into<String>, alias: impl Into<String>) -> Self {
1103        Self::Column {
1104            column: column.into(),
1105            alias: alias.into(),
1106            function: AggregateFunction::CountNulls,
1107            distinct: false,
1108        }
1109    }
1110}
1111
1112/// Helper to convert an Arrow array cell into a plan-level Value.
1113pub fn plan_value_from_array(array: &ArrayRef, index: usize) -> PlanResult<PlanValue> {
1114    if array.is_null(index) {
1115        return Ok(PlanValue::Null);
1116    }
1117    match array.data_type() {
1118        DataType::Boolean => {
1119            let values = array
1120                .as_any()
1121                .downcast_ref::<BooleanArray>()
1122                .ok_or_else(|| {
1123                    Error::InvalidArgumentError("expected Boolean array in INSERT SELECT".into())
1124                })?;
1125            Ok(PlanValue::Integer(if values.value(index) { 1 } else { 0 }))
1126        }
1127        DataType::Int64 => {
1128            let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1129                Error::InvalidArgumentError("expected Int64 array in INSERT SELECT".into())
1130            })?;
1131            Ok(PlanValue::Integer(values.value(index)))
1132        }
1133        DataType::Float64 => {
1134            let values = array
1135                .as_any()
1136                .downcast_ref::<Float64Array>()
1137                .ok_or_else(|| {
1138                    Error::InvalidArgumentError("expected Float64 array in INSERT SELECT".into())
1139                })?;
1140            Ok(PlanValue::Float(values.value(index)))
1141        }
1142        DataType::Utf8 => {
1143            let values = array
1144                .as_any()
1145                .downcast_ref::<StringArray>()
1146                .ok_or_else(|| {
1147                    Error::InvalidArgumentError("expected Utf8 array in INSERT SELECT".into())
1148                })?;
1149            Ok(PlanValue::String(values.value(index).to_string()))
1150        }
1151        DataType::Date32 => {
1152            let values = array
1153                .as_any()
1154                .downcast_ref::<Date32Array>()
1155                .ok_or_else(|| {
1156                    Error::InvalidArgumentError("expected Date32 array in INSERT SELECT".into())
1157                })?;
1158            Ok(PlanValue::Integer(values.value(index) as i64))
1159        }
1160        other => Err(Error::InvalidArgumentError(format!(
1161            "unsupported data type in INSERT SELECT: {other:?}"
1162        ))),
1163    }
1164}
1165
1166// ============================================================================
1167// ORDER BY Plan
1168// ============================================================================
1169
1170/// ORDER BY specification.
1171#[derive(Clone, Debug)]
1172pub struct OrderByPlan {
1173    pub target: OrderTarget,
1174    pub sort_type: OrderSortType,
1175    pub ascending: bool,
1176    pub nulls_first: bool,
1177}
1178
1179/// Sort type for ORDER BY.
1180#[derive(Clone, Debug)]
1181pub enum OrderSortType {
1182    Native,
1183    CastTextToInteger,
1184}
1185
1186/// Target column/expression for ORDER BY.
1187#[derive(Clone, Debug)]
1188pub enum OrderTarget {
1189    Column(String),
1190    Index(usize),
1191    All,
1192}
1193
1194// ============================================================================
1195// Operation Enum for Transaction Replay
1196// ============================================================================
1197
1198/// Recordable plan operation for transaction replay.
1199#[derive(Clone, Debug)]
1200pub enum PlanOperation {
1201    CreateTable(CreateTablePlan),
1202    DropTable(DropTablePlan),
1203    Insert(InsertPlan),
1204    Update(UpdatePlan),
1205    Delete(DeletePlan),
1206    Truncate(TruncatePlan),
1207    Select(Box<SelectPlan>),
1208}
1209
1210/// Top-level plan statements that can be executed against a `Session`.
1211#[derive(Clone, Debug)]
1212pub enum PlanStatement {
1213    BeginTransaction,
1214    CommitTransaction,
1215    RollbackTransaction,
1216    CreateTable(CreateTablePlan),
1217    DropTable(DropTablePlan),
1218    CreateView(CreateViewPlan),
1219    DropView(DropViewPlan),
1220    DropIndex(DropIndexPlan),
1221    AlterTable(AlterTablePlan),
1222    CreateIndex(CreateIndexPlan),
1223    Reindex(ReindexPlan),
1224    Insert(InsertPlan),
1225    Update(UpdatePlan),
1226    Delete(DeletePlan),
1227    Truncate(TruncatePlan),
1228    Select(Box<SelectPlan>),
1229}