llkv_plan/
plans.rs

1//! Logical query plan structures for LLKV.
2//!
3//! This module defines the plan structures that represent logical query operations
4//! before they are executed. Plans are created by SQL parsers or fluent builders and
5//! consumed by execution engines.
6
7use std::sync::Arc;
8
9use arrow::array::{ArrayRef, BooleanArray, Date32Array, Float64Array, Int64Array, StringArray};
10use arrow::datatypes::{DataType, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_expr::expr::SubqueryId;
13use llkv_result::Error;
14
15/// Result type for plan operations.
16pub type PlanResult<T> = llkv_result::Result<T>;
17
18// ============================================================================
19// Filter Metadata
20// ============================================================================
21
22/// Boolean predicate plus correlated subquery metadata attached to a [`SelectPlan`].
23#[derive(Clone, Debug)]
24pub struct SelectFilter {
25    /// Predicate applied to rows before projections execute.
26    pub predicate: llkv_expr::expr::Expr<'static, String>,
27    /// Correlated subqueries required to evaluate the predicate.
28    pub subqueries: Vec<FilterSubquery>,
29}
30
31/// Correlated subquery invoked from within a filter predicate.
32#[derive(Clone, Debug)]
33pub struct FilterSubquery {
34    /// Identifier referenced by [`llkv_expr::expr::Expr::Exists`].
35    pub id: SubqueryId,
36    /// Logical plan for the subquery.
37    pub plan: Box<SelectPlan>,
38    /// Mappings for correlated column placeholders to real outer columns.
39    pub correlated_columns: Vec<CorrelatedColumn>,
40}
41
42/// Correlated subquery invoked from within a scalar projection expression.
43#[derive(Clone, Debug)]
44pub struct ScalarSubquery {
45    /// Identifier referenced by [`llkv_expr::expr::ScalarExpr::ScalarSubquery`].
46    pub id: SubqueryId,
47    /// Logical plan for the subquery.
48    pub plan: Box<SelectPlan>,
49    /// Mappings for correlated column placeholders to real outer columns.
50    pub correlated_columns: Vec<CorrelatedColumn>,
51}
52
53/// Description of a correlated column captured by an EXISTS predicate.
54#[derive(Clone, Debug)]
55pub struct CorrelatedColumn {
56    /// Placeholder column name injected into the subquery expression tree.
57    pub placeholder: String,
58    /// Canonical outer column name.
59    pub column: String,
60    /// Optional nested field path for struct lookups.
61    pub field_path: Vec<String>,
62}
63
64// ============================================================================
65// PlanValue Types
66// ============================================================================
67
68#[derive(Clone, Debug, PartialEq)]
69pub enum PlanValue {
70    Null,
71    Integer(i64),
72    Float(f64),
73    String(String),
74    Struct(std::collections::HashMap<String, PlanValue>),
75}
76
77impl From<&str> for PlanValue {
78    fn from(value: &str) -> Self {
79        Self::String(value.to_string())
80    }
81}
82
83impl From<String> for PlanValue {
84    fn from(value: String) -> Self {
85        Self::String(value)
86    }
87}
88
89impl From<i64> for PlanValue {
90    fn from(value: i64) -> Self {
91        Self::Integer(value)
92    }
93}
94
95impl From<f64> for PlanValue {
96    fn from(value: f64) -> Self {
97        Self::Float(value)
98    }
99}
100
101impl From<bool> for PlanValue {
102    fn from(value: bool) -> Self {
103        // Store booleans as integers for compatibility
104        if value {
105            Self::Integer(1)
106        } else {
107            Self::Integer(0)
108        }
109    }
110}
111
112impl From<i32> for PlanValue {
113    fn from(value: i32) -> Self {
114        Self::Integer(value as i64)
115    }
116}
117
118/// Convert a `Literal` from llkv-expr into a `PlanValue`.
119///
120/// This is useful for evaluating predicates that contain literal values,
121/// such as in HAVING clauses or filter expressions.
122pub fn plan_value_from_literal(literal: &llkv_expr::Literal) -> PlanResult<PlanValue> {
123    use llkv_expr::Literal;
124
125    match literal {
126        Literal::Null => Ok(PlanValue::Null),
127        Literal::Integer(i) => {
128            // Convert i128 to i64, checking for overflow
129            if *i > i64::MAX as i128 || *i < i64::MIN as i128 {
130                Err(Error::InvalidArgumentError(format!(
131                    "Integer literal {} out of range for i64",
132                    i
133                )))
134            } else {
135                Ok(PlanValue::Integer(*i as i64))
136            }
137        }
138        Literal::Float(f) => Ok(PlanValue::Float(*f)),
139        Literal::String(s) => Ok(PlanValue::String(s.clone())),
140        Literal::Boolean(b) => Ok(PlanValue::from(*b)),
141        Literal::Struct(fields) => {
142            let mut map = std::collections::HashMap::new();
143            for (name, value) in fields {
144                let plan_value = plan_value_from_literal(value)?;
145                map.insert(name.clone(), plan_value);
146            }
147            Ok(PlanValue::Struct(map))
148        }
149    }
150}
151
152// ============================================================================
153// CREATE TABLE Plan
154// ============================================================================
155
156/// Multi-column unique constraint specification.
157#[derive(Clone, Debug)]
158pub struct MultiColumnUniqueSpec {
159    /// Optional name for the unique constraint
160    pub name: Option<String>,
161    /// Column names participating in this UNIQUE constraint
162    pub columns: Vec<String>,
163}
164
165/// Plan for creating a table.
166#[derive(Clone, Debug)]
167pub struct CreateTablePlan {
168    pub name: String,
169    pub if_not_exists: bool,
170    pub or_replace: bool,
171    pub columns: Vec<PlanColumnSpec>,
172    pub source: Option<CreateTableSource>,
173    /// Optional storage namespace for the table.
174    pub namespace: Option<String>,
175    pub foreign_keys: Vec<ForeignKeySpec>,
176    pub multi_column_uniques: Vec<MultiColumnUniqueSpec>,
177}
178
179impl CreateTablePlan {
180    pub fn new(name: impl Into<String>) -> Self {
181        Self {
182            name: name.into(),
183            if_not_exists: false,
184            or_replace: false,
185            columns: Vec::new(),
186            source: None,
187            namespace: None,
188            foreign_keys: Vec::new(),
189            multi_column_uniques: Vec::new(),
190        }
191    }
192}
193
194// ============================================================================
195// DROP TABLE Plan
196// ============================================================================
197
198/// Plan for dropping a table.
199#[derive(Clone, Debug)]
200pub struct DropTablePlan {
201    pub name: String,
202    pub if_exists: bool,
203}
204
205impl DropTablePlan {
206    pub fn new(name: impl Into<String>) -> Self {
207        Self {
208            name: name.into(),
209            if_exists: false,
210        }
211    }
212
213    pub fn if_exists(mut self, if_exists: bool) -> Self {
214        self.if_exists = if_exists;
215        self
216    }
217}
218
219// ============================================================================
220// RENAME TABLE Plan
221// ============================================================================
222
223/// Plan for renaming a table.
224#[derive(Clone, Debug, PartialEq, Eq)]
225pub struct RenameTablePlan {
226    pub current_name: String,
227    pub new_name: String,
228    pub if_exists: bool,
229}
230
231impl RenameTablePlan {
232    pub fn new(current_name: impl Into<String>, new_name: impl Into<String>) -> Self {
233        Self {
234            current_name: current_name.into(),
235            new_name: new_name.into(),
236            if_exists: false,
237        }
238    }
239
240    pub fn if_exists(mut self, if_exists: bool) -> Self {
241        self.if_exists = if_exists;
242        self
243    }
244}
245
246/// Plan for dropping an index.
247#[derive(Clone, Debug, PartialEq)]
248pub struct DropIndexPlan {
249    pub name: String,
250    pub canonical_name: String,
251    pub if_exists: bool,
252}
253
254impl DropIndexPlan {
255    pub fn new(name: impl Into<String>) -> Self {
256        let display = name.into();
257        Self {
258            canonical_name: display.to_ascii_lowercase(),
259            name: display,
260            if_exists: false,
261        }
262    }
263
264    pub fn with_canonical(mut self, canonical: impl Into<String>) -> Self {
265        self.canonical_name = canonical.into();
266        self
267    }
268
269    pub fn if_exists(mut self, if_exists: bool) -> Self {
270        self.if_exists = if_exists;
271        self
272    }
273}
274
275// ============================================================================
276// ALTER TABLE Plan Structures
277// ============================================================================
278
279/// Plan for ALTER TABLE operations.
280#[derive(Clone, Debug, PartialEq)]
281pub struct AlterTablePlan {
282    pub table_name: String,
283    pub if_exists: bool,
284    pub operation: AlterTableOperation,
285}
286
287/// Specific ALTER TABLE operation to perform.
288#[derive(Clone, Debug, PartialEq)]
289pub enum AlterTableOperation {
290    /// RENAME COLUMN old_name TO new_name
291    RenameColumn {
292        old_column_name: String,
293        new_column_name: String,
294    },
295    /// ALTER COLUMN column_name SET DATA TYPE new_type
296    SetColumnDataType {
297        column_name: String,
298        new_data_type: String, // SQL type string like "INTEGER", "VARCHAR", etc.
299    },
300    /// DROP COLUMN column_name
301    DropColumn {
302        column_name: String,
303        if_exists: bool,
304        cascade: bool,
305    },
306}
307
308impl AlterTablePlan {
309    pub fn new(table_name: impl Into<String>, operation: AlterTableOperation) -> Self {
310        Self {
311            table_name: table_name.into(),
312            if_exists: false,
313            operation,
314        }
315    }
316
317    pub fn if_exists(mut self, if_exists: bool) -> Self {
318        self.if_exists = if_exists;
319        self
320    }
321}
322
323// ============================================================================
324// FOREIGN KEY Plan Structures
325// ============================================================================
326
327#[derive(Clone, Debug, Default, PartialEq, Eq)]
328pub enum ForeignKeyAction {
329    #[default]
330    NoAction,
331    Restrict,
332}
333
334#[derive(Clone, Debug)]
335pub struct ForeignKeySpec {
336    pub name: Option<String>,
337    pub columns: Vec<String>,
338    pub referenced_table: String,
339    pub referenced_columns: Vec<String>,
340    pub on_delete: ForeignKeyAction,
341    pub on_update: ForeignKeyAction,
342}
343
344// ============================================================================
345// CREATE INDEX Plan
346// ============================================================================
347
348/// Column specification for CREATE INDEX statements.
349#[derive(Clone, Debug, PartialEq, Eq)]
350pub struct IndexColumnPlan {
351    pub name: String,
352    pub ascending: bool,
353    pub nulls_first: bool,
354}
355
356impl IndexColumnPlan {
357    pub fn new(name: impl Into<String>) -> Self {
358        Self {
359            name: name.into(),
360            ascending: true,
361            nulls_first: false,
362        }
363    }
364
365    pub fn with_sort(mut self, ascending: bool, nulls_first: bool) -> Self {
366        self.ascending = ascending;
367        self.nulls_first = nulls_first;
368        self
369    }
370}
371
372/// Plan for creating an index on a table.
373#[derive(Clone, Debug)]
374pub struct CreateIndexPlan {
375    pub name: Option<String>,
376    pub table: String,
377    pub unique: bool,
378    pub if_not_exists: bool,
379    pub columns: Vec<IndexColumnPlan>,
380}
381
382impl CreateIndexPlan {
383    pub fn new(table: impl Into<String>) -> Self {
384        Self {
385            name: None,
386            table: table.into(),
387            unique: false,
388            if_not_exists: false,
389            columns: Vec::new(),
390        }
391    }
392
393    pub fn with_name(mut self, name: Option<String>) -> Self {
394        self.name = name;
395        self
396    }
397
398    pub fn with_unique(mut self, unique: bool) -> Self {
399        self.unique = unique;
400        self
401    }
402
403    pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self {
404        self.if_not_exists = if_not_exists;
405        self
406    }
407
408    pub fn with_columns(mut self, columns: Vec<IndexColumnPlan>) -> Self {
409        self.columns = columns;
410        self
411    }
412}
413
414/// Column specification produced by the logical planner.
415///
416/// This struct flows from the planner into the runtime/executor so callers can
417/// reason about column metadata without duplicating field definitions.
418#[derive(Clone, Debug)]
419pub struct PlanColumnSpec {
420    pub name: String,
421    pub data_type: DataType,
422    pub nullable: bool,
423    pub primary_key: bool,
424    pub unique: bool,
425    /// Optional CHECK constraint expression (SQL string).
426    /// Example: "t.t=42" for CHECK(t.t=42)
427    pub check_expr: Option<String>,
428}
429
430impl PlanColumnSpec {
431    pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
432        Self {
433            name: name.into(),
434            data_type,
435            nullable,
436            primary_key: false,
437            unique: false,
438            check_expr: None,
439        }
440    }
441
442    pub fn with_primary_key(mut self, primary_key: bool) -> Self {
443        self.primary_key = primary_key;
444        if primary_key {
445            self.unique = true;
446        }
447        self
448    }
449
450    pub fn with_unique(mut self, unique: bool) -> Self {
451        if unique {
452            self.unique = true;
453        }
454        self
455    }
456
457    pub fn with_check(mut self, check_expr: Option<String>) -> Self {
458        self.check_expr = check_expr;
459        self
460    }
461}
462
463/// Trait for types that can be converted into a [`PlanColumnSpec`].
464pub trait IntoPlanColumnSpec {
465    fn into_plan_column_spec(self) -> PlanColumnSpec;
466}
467
468/// Column nullability specification.
469#[derive(Clone, Copy, Debug, PartialEq, Eq)]
470pub enum ColumnNullability {
471    Nullable,
472    NotNull,
473}
474
475impl ColumnNullability {
476    pub fn is_nullable(self) -> bool {
477        matches!(self, ColumnNullability::Nullable)
478    }
479}
480
481/// Convenience constant for nullable columns.
482#[allow(non_upper_case_globals)]
483pub const Nullable: ColumnNullability = ColumnNullability::Nullable;
484
485/// Convenience constant for non-null columns.
486#[allow(non_upper_case_globals)]
487pub const NotNull: ColumnNullability = ColumnNullability::NotNull;
488
489impl IntoPlanColumnSpec for PlanColumnSpec {
490    fn into_plan_column_spec(self) -> PlanColumnSpec {
491        self
492    }
493}
494
495impl<T> IntoPlanColumnSpec for &T
496where
497    T: Clone + IntoPlanColumnSpec,
498{
499    fn into_plan_column_spec(self) -> PlanColumnSpec {
500        self.clone().into_plan_column_spec()
501    }
502}
503
504impl IntoPlanColumnSpec for (&str, DataType) {
505    fn into_plan_column_spec(self) -> PlanColumnSpec {
506        PlanColumnSpec::new(self.0, self.1, true)
507    }
508}
509
510impl IntoPlanColumnSpec for (&str, DataType, bool) {
511    fn into_plan_column_spec(self) -> PlanColumnSpec {
512        PlanColumnSpec::new(self.0, self.1, self.2)
513    }
514}
515
516impl IntoPlanColumnSpec for (&str, DataType, ColumnNullability) {
517    fn into_plan_column_spec(self) -> PlanColumnSpec {
518        PlanColumnSpec::new(self.0, self.1, self.2.is_nullable())
519    }
520}
521
522/// Source data for CREATE TABLE AS SELECT.
523#[derive(Clone, Debug)]
524pub enum CreateTableSource {
525    Batches {
526        schema: Arc<Schema>,
527        batches: Vec<RecordBatch>,
528    },
529    Select {
530        plan: Box<SelectPlan>,
531    },
532}
533
534// ============================================================================
535// INSERT Plan
536// ============================================================================
537
538/// Plan for inserting data into a table.
539#[derive(Clone, Debug)]
540pub struct InsertPlan {
541    pub table: String,
542    pub columns: Vec<String>,
543    pub source: InsertSource,
544}
545
546/// Source data for INSERT operations.
547#[derive(Clone, Debug)]
548pub enum InsertSource {
549    Rows(Vec<Vec<PlanValue>>),
550    Batches(Vec<RecordBatch>),
551    Select { plan: Box<SelectPlan> },
552}
553
554// ============================================================================
555// UPDATE Plan
556// ============================================================================
557
558/// Plan for updating rows in a table.
559#[derive(Clone, Debug)]
560pub struct UpdatePlan {
561    pub table: String,
562    pub assignments: Vec<ColumnAssignment>,
563    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
564}
565
566/// Value to assign in an UPDATE.
567#[derive(Clone, Debug)]
568pub enum AssignmentValue {
569    Literal(PlanValue),
570    Expression(llkv_expr::expr::ScalarExpr<String>),
571}
572
573/// Column assignment for UPDATE.
574#[derive(Clone, Debug)]
575pub struct ColumnAssignment {
576    pub column: String,
577    pub value: AssignmentValue,
578}
579
580// ============================================================================
581// DELETE Plan
582// ============================================================================
583
584/// Plan for deleting rows from a table.
585#[derive(Clone, Debug)]
586pub struct DeletePlan {
587    pub table: String,
588    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
589}
590
591// ============================================================================
592// TRUNCATE Plan
593// ============================================================================
594
595/// Plan for TRUNCATE TABLE operation (removes all rows).
596#[derive(Clone, Debug)]
597pub struct TruncatePlan {
598    pub table: String,
599}
600
601// ============================================================================
602// SELECT Plan
603// ============================================================================
604
605/// Table reference in FROM clause.
606#[derive(Clone, Debug)]
607pub struct TableRef {
608    pub schema: String,
609    pub table: String,
610    pub alias: Option<String>,
611}
612
613impl TableRef {
614    pub fn new(schema: impl Into<String>, table: impl Into<String>) -> Self {
615        Self {
616            schema: schema.into(),
617            table: table.into(),
618            alias: None,
619        }
620    }
621
622    pub fn with_alias(
623        schema: impl Into<String>,
624        table: impl Into<String>,
625        alias: Option<String>,
626    ) -> Self {
627        Self {
628            schema: schema.into(),
629            table: table.into(),
630            alias,
631        }
632    }
633
634    /// Preferred display name for the table (alias if present).
635    pub fn display_name(&self) -> String {
636        self.alias
637            .as_ref()
638            .cloned()
639            .unwrap_or_else(|| self.qualified_name())
640    }
641
642    pub fn qualified_name(&self) -> String {
643        if self.schema.is_empty() {
644            self.table.clone()
645        } else {
646            format!("{}.{}", self.schema, self.table)
647        }
648    }
649}
650
651// ============================================================================
652// Join Metadata
653// ============================================================================
654
655/// Type of join operation for query planning.
656///
657/// This is a plan-layer type that mirrors `llkv_join::JoinType` but exists
658/// separately to avoid circular dependencies (llkv-join depends on llkv-table
659/// which depends on llkv-plan). The executor converts `JoinPlan` to `llkv_join::JoinType`.
660#[derive(Clone, Copy, Debug, PartialEq, Eq)]
661pub enum JoinPlan {
662    /// Emit only matching row pairs.
663    Inner,
664    /// Emit all left rows; unmatched left rows have NULL right columns.
665    Left,
666    /// Emit all right rows; unmatched right rows have NULL left columns.
667    Right,
668    /// Emit all rows from both sides; unmatched rows have NULLs.
669    Full,
670}
671
672/// Metadata describing a join between consecutive tables in the FROM clause.
673///
674/// Tracks the join type and optional ON condition filter for each join.
675/// The join connects table at index `left_table_index` with `left_table_index + 1`.
676/// Replaces the older `join_types`/`join_filters` vectors so executors can
677/// inspect a single compact structure when coordinating join evaluation.
678#[derive(Clone, Debug)]
679pub struct JoinMetadata {
680    /// Index of the left table in the `SelectPlan.tables` vector.
681    pub left_table_index: usize,
682    /// Type of join (INNER, LEFT, RIGHT, etc.).
683    pub join_type: JoinPlan,
684    /// Optional ON condition filter expression. Translators also thread this
685    /// predicate through [`SelectPlan::filter`] so the optimizer can merge it
686    /// with other WHERE clauses, but keeping it here enables join-specific
687    /// rewrites (e.g., push-down or hash join pruning).
688    pub on_condition: Option<llkv_expr::expr::Expr<'static, String>>,
689}
690
691/// Logical query plan for SELECT operations.
692///
693/// The `tables` collection preserves the FROM clause order while [`Self::joins`]
694/// captures how adjacent tables are connected via [`JoinMetadata`]. This keeps
695/// join semantics alongside table references instead of parallel vectors and
696/// mirrors what the executor expects when materialising join pipelines.
697#[derive(Clone, Debug)]
698pub struct SelectPlan {
699    /// Tables to query. Empty vec means no FROM clause (e.g., SELECT 42).
700    /// Single element for simple queries, multiple for joins/cross products.
701    pub tables: Vec<TableRef>,
702    /// Join metadata describing how tables are joined.
703    /// If empty, all tables are implicitly cross-joined (Cartesian product).
704    /// Each entry describes a join between `tables[i]` and `tables[i + 1]`.
705    pub joins: Vec<JoinMetadata>,
706    pub projections: Vec<SelectProjection>,
707    /// Optional WHERE predicate plus dependent correlated subqueries.
708    pub filter: Option<SelectFilter>,
709    /// Optional HAVING predicate applied after grouping.
710    pub having: Option<llkv_expr::expr::Expr<'static, String>>,
711    /// Scalar subqueries referenced by projections, keyed by `SubqueryId`.
712    pub scalar_subqueries: Vec<ScalarSubquery>,
713    pub aggregates: Vec<AggregateExpr>,
714    pub order_by: Vec<OrderByPlan>,
715    pub distinct: bool,
716    /// Optional compound (set-operation) plan.
717    pub compound: Option<CompoundSelectPlan>,
718    /// Columns used in GROUP BY clauses (canonical names).
719    pub group_by: Vec<String>,
720    /// Optional value table output mode (BigQuery style).
721    pub value_table_mode: Option<ValueTableMode>,
722}
723
724impl SelectPlan {
725    /// Create a SelectPlan for a single table.
726    pub fn new(table: impl Into<String>) -> Self {
727        let table_name = table.into();
728        let tables = if table_name.is_empty() {
729            Vec::new()
730        } else {
731            // Parse "schema.table" or just "table"
732            let parts: Vec<&str> = table_name.split('.').collect();
733            if parts.len() >= 2 {
734                let table_part = parts[1..].join(".");
735                vec![TableRef::new(parts[0], table_part)]
736            } else {
737                vec![TableRef::new("", table_name)]
738            }
739        };
740
741        Self {
742            tables,
743            joins: Vec::new(),
744            projections: Vec::new(),
745            filter: None,
746            having: None,
747            scalar_subqueries: Vec::new(),
748            aggregates: Vec::new(),
749            order_by: Vec::new(),
750            distinct: false,
751            compound: None,
752            group_by: Vec::new(),
753            value_table_mode: None,
754        }
755    }
756
757    /// Create a SelectPlan with multiple tables for cross product/joins.
758    ///
759    /// The returned plan leaves [`Self::joins`] empty, which means any
760    /// evaluation engine should treat the tables as a Cartesian product until
761    /// [`Self::with_joins`] populates concrete join relationships.
762    pub fn with_tables(tables: Vec<TableRef>) -> Self {
763        Self {
764            tables,
765            joins: Vec::new(),
766            projections: Vec::new(),
767            filter: None,
768            having: None,
769            scalar_subqueries: Vec::new(),
770            aggregates: Vec::new(),
771            order_by: Vec::new(),
772            distinct: false,
773            compound: None,
774            group_by: Vec::new(),
775            value_table_mode: None,
776        }
777    }
778
779    pub fn with_projections(mut self, projections: Vec<SelectProjection>) -> Self {
780        self.projections = projections;
781        self
782    }
783
784    pub fn with_filter(mut self, filter: Option<SelectFilter>) -> Self {
785        self.filter = filter;
786        self
787    }
788
789    pub fn with_having(mut self, having: Option<llkv_expr::expr::Expr<'static, String>>) -> Self {
790        self.having = having;
791        self
792    }
793
794    /// Attach scalar subqueries discovered during SELECT translation.
795    pub fn with_scalar_subqueries(mut self, scalar_subqueries: Vec<ScalarSubquery>) -> Self {
796        self.scalar_subqueries = scalar_subqueries;
797        self
798    }
799
800    pub fn with_aggregates(mut self, aggregates: Vec<AggregateExpr>) -> Self {
801        self.aggregates = aggregates;
802        self
803    }
804
805    pub fn with_order_by(mut self, order_by: Vec<OrderByPlan>) -> Self {
806        self.order_by = order_by;
807        self
808    }
809
810    pub fn with_distinct(mut self, distinct: bool) -> Self {
811        self.distinct = distinct;
812        self
813    }
814
815    /// Attach join metadata describing how tables are connected.
816    ///
817    /// Each [`JoinMetadata`] entry pairs `tables[i]` with `tables[i + 1]`. The
818    /// builder should supply exactly `tables.len().saturating_sub(1)` entries
819    /// when explicit joins are required; otherwise consumers fall back to a
820    /// Cartesian product.
821    pub fn with_joins(mut self, joins: Vec<JoinMetadata>) -> Self {
822        self.joins = joins;
823        self
824    }
825
826    /// Attach a compound (set operation) plan.
827    pub fn with_compound(mut self, compound: CompoundSelectPlan) -> Self {
828        self.compound = Some(compound);
829        self
830    }
831
832    pub fn with_group_by(mut self, group_by: Vec<String>) -> Self {
833        self.group_by = group_by;
834        self
835    }
836
837    pub fn with_value_table_mode(mut self, mode: Option<ValueTableMode>) -> Self {
838        self.value_table_mode = mode;
839        self
840    }
841}
842
843/// Set operation applied between SELECT statements.
844#[derive(Clone, Debug, PartialEq, Eq)]
845pub enum CompoundOperator {
846    Union,
847    Intersect,
848    Except,
849}
850
851/// Quantifier associated with set operations (e.g., UNION vs UNION ALL).
852#[derive(Clone, Debug, PartialEq, Eq)]
853pub enum CompoundQuantifier {
854    Distinct,
855    All,
856}
857
858/// Component of a compound SELECT (set operation).
859#[derive(Clone, Debug)]
860pub struct CompoundSelectComponent {
861    pub operator: CompoundOperator,
862    pub quantifier: CompoundQuantifier,
863    pub plan: SelectPlan,
864}
865
866/// Compound SELECT plan representing a tree of set operations.
867#[derive(Clone, Debug)]
868pub struct CompoundSelectPlan {
869    pub initial: Box<SelectPlan>,
870    pub operations: Vec<CompoundSelectComponent>,
871}
872
873impl CompoundSelectPlan {
874    pub fn new(initial: SelectPlan) -> Self {
875        Self {
876            initial: Box::new(initial),
877            operations: Vec::new(),
878        }
879    }
880
881    pub fn push_operation(
882        &mut self,
883        operator: CompoundOperator,
884        quantifier: CompoundQuantifier,
885        plan: SelectPlan,
886    ) {
887        self.operations.push(CompoundSelectComponent {
888            operator,
889            quantifier,
890            plan,
891        });
892    }
893}
894
895/// Projection specification for SELECT.
896#[derive(Clone, Debug)]
897pub enum SelectProjection {
898    AllColumns,
899    AllColumnsExcept {
900        exclude: Vec<String>,
901    },
902    Column {
903        name: String,
904        alias: Option<String>,
905    },
906    Computed {
907        expr: llkv_expr::expr::ScalarExpr<String>,
908        alias: String,
909    },
910}
911
912/// Value table output modes (BigQuery-style).
913#[derive(Clone, Debug, PartialEq, Eq)]
914pub enum ValueTableMode {
915    AsStruct,
916    AsValue,
917    DistinctAsStruct,
918    DistinctAsValue,
919}
920
921// ============================================================================
922// Aggregate Plans
923// ============================================================================
924
925/// Aggregate expression in SELECT.
926#[derive(Clone, Debug)]
927pub enum AggregateExpr {
928    CountStar {
929        alias: String,
930    },
931    Column {
932        column: String,
933        alias: String,
934        function: AggregateFunction,
935        distinct: bool,
936    },
937}
938
939/// Supported aggregate functions.
940#[derive(Clone, Debug)]
941pub enum AggregateFunction {
942    Count,
943    SumInt64,
944    MinInt64,
945    MaxInt64,
946    CountNulls,
947}
948
949impl AggregateExpr {
950    pub fn count_star(alias: impl Into<String>) -> Self {
951        Self::CountStar {
952            alias: alias.into(),
953        }
954    }
955
956    pub fn count_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
957        Self::Column {
958            column: column.into(),
959            alias: alias.into(),
960            function: AggregateFunction::Count,
961            distinct: false,
962        }
963    }
964
965    pub fn count_distinct_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
966        Self::Column {
967            column: column.into(),
968            alias: alias.into(),
969            function: AggregateFunction::Count,
970            distinct: true,
971        }
972    }
973
974    pub fn sum_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
975        Self::Column {
976            column: column.into(),
977            alias: alias.into(),
978            function: AggregateFunction::SumInt64,
979            distinct: false,
980        }
981    }
982
983    pub fn min_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
984        Self::Column {
985            column: column.into(),
986            alias: alias.into(),
987            function: AggregateFunction::MinInt64,
988            distinct: false,
989        }
990    }
991
992    pub fn max_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
993        Self::Column {
994            column: column.into(),
995            alias: alias.into(),
996            function: AggregateFunction::MaxInt64,
997            distinct: false,
998        }
999    }
1000
1001    pub fn count_nulls(column: impl Into<String>, alias: impl Into<String>) -> Self {
1002        Self::Column {
1003            column: column.into(),
1004            alias: alias.into(),
1005            function: AggregateFunction::CountNulls,
1006            distinct: false,
1007        }
1008    }
1009}
1010
1011/// Helper to convert an Arrow array cell into a plan-level Value.
1012pub fn plan_value_from_array(array: &ArrayRef, index: usize) -> PlanResult<PlanValue> {
1013    if array.is_null(index) {
1014        return Ok(PlanValue::Null);
1015    }
1016    match array.data_type() {
1017        DataType::Boolean => {
1018            let values = array
1019                .as_any()
1020                .downcast_ref::<BooleanArray>()
1021                .ok_or_else(|| {
1022                    Error::InvalidArgumentError("expected Boolean array in INSERT SELECT".into())
1023                })?;
1024            Ok(PlanValue::Integer(if values.value(index) { 1 } else { 0 }))
1025        }
1026        DataType::Int64 => {
1027            let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1028                Error::InvalidArgumentError("expected Int64 array in INSERT SELECT".into())
1029            })?;
1030            Ok(PlanValue::Integer(values.value(index)))
1031        }
1032        DataType::Float64 => {
1033            let values = array
1034                .as_any()
1035                .downcast_ref::<Float64Array>()
1036                .ok_or_else(|| {
1037                    Error::InvalidArgumentError("expected Float64 array in INSERT SELECT".into())
1038                })?;
1039            Ok(PlanValue::Float(values.value(index)))
1040        }
1041        DataType::Utf8 => {
1042            let values = array
1043                .as_any()
1044                .downcast_ref::<StringArray>()
1045                .ok_or_else(|| {
1046                    Error::InvalidArgumentError("expected Utf8 array in INSERT SELECT".into())
1047                })?;
1048            Ok(PlanValue::String(values.value(index).to_string()))
1049        }
1050        DataType::Date32 => {
1051            let values = array
1052                .as_any()
1053                .downcast_ref::<Date32Array>()
1054                .ok_or_else(|| {
1055                    Error::InvalidArgumentError("expected Date32 array in INSERT SELECT".into())
1056                })?;
1057            Ok(PlanValue::Integer(values.value(index) as i64))
1058        }
1059        other => Err(Error::InvalidArgumentError(format!(
1060            "unsupported data type in INSERT SELECT: {other:?}"
1061        ))),
1062    }
1063}
1064
1065// ============================================================================
1066// ORDER BY Plan
1067// ============================================================================
1068
1069/// ORDER BY specification.
1070#[derive(Clone, Debug)]
1071pub struct OrderByPlan {
1072    pub target: OrderTarget,
1073    pub sort_type: OrderSortType,
1074    pub ascending: bool,
1075    pub nulls_first: bool,
1076}
1077
1078/// Sort type for ORDER BY.
1079#[derive(Clone, Debug)]
1080pub enum OrderSortType {
1081    Native,
1082    CastTextToInteger,
1083}
1084
1085/// Target column/expression for ORDER BY.
1086#[derive(Clone, Debug)]
1087pub enum OrderTarget {
1088    Column(String),
1089    Index(usize),
1090    All,
1091}
1092
1093// ============================================================================
1094// Operation Enum for Transaction Replay
1095// ============================================================================
1096
1097/// Recordable plan operation for transaction replay.
1098#[derive(Clone, Debug)]
1099pub enum PlanOperation {
1100    CreateTable(CreateTablePlan),
1101    DropTable(DropTablePlan),
1102    Insert(InsertPlan),
1103    Update(UpdatePlan),
1104    Delete(DeletePlan),
1105    Truncate(TruncatePlan),
1106    Select(Box<SelectPlan>),
1107}
1108
1109/// Top-level plan statements that can be executed against a `Session`.
1110#[derive(Clone, Debug)]
1111pub enum PlanStatement {
1112    BeginTransaction,
1113    CommitTransaction,
1114    RollbackTransaction,
1115    CreateTable(CreateTablePlan),
1116    DropTable(DropTablePlan),
1117    DropIndex(DropIndexPlan),
1118    AlterTable(AlterTablePlan),
1119    CreateIndex(CreateIndexPlan),
1120    Insert(InsertPlan),
1121    Update(UpdatePlan),
1122    Delete(DeletePlan),
1123    Truncate(TruncatePlan),
1124    Select(Box<SelectPlan>),
1125}