llkv_plan/
plans.rs

1//! Logical query plan structures for LLKV.
2//!
3//! This module defines the plan structures that represent logical query operations
4//! before they are executed. Plans are created by SQL parsers or fluent builders and
5//! consumed by execution engines.
6
7use std::sync::Arc;
8
9use arrow::array::{ArrayRef, BooleanArray, Date32Array, Float64Array, Int64Array, StringArray};
10use arrow::datatypes::{DataType, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_result::Error;
13
14/// Result type for plan operations.
15pub type PlanResult<T> = llkv_result::Result<T>;
16
17// ============================================================================
18// PlanValue Types
19// ============================================================================
20
21#[derive(Clone, Debug, PartialEq)]
22pub enum PlanValue {
23    Null,
24    Integer(i64),
25    Float(f64),
26    String(String),
27    Struct(std::collections::HashMap<String, PlanValue>),
28}
29
30impl From<&str> for PlanValue {
31    fn from(value: &str) -> Self {
32        Self::String(value.to_string())
33    }
34}
35
36impl From<String> for PlanValue {
37    fn from(value: String) -> Self {
38        Self::String(value)
39    }
40}
41
42impl From<i64> for PlanValue {
43    fn from(value: i64) -> Self {
44        Self::Integer(value)
45    }
46}
47
48impl From<f64> for PlanValue {
49    fn from(value: f64) -> Self {
50        Self::Float(value)
51    }
52}
53
54impl From<bool> for PlanValue {
55    fn from(value: bool) -> Self {
56        // Store booleans as integers for compatibility
57        if value {
58            Self::Integer(1)
59        } else {
60            Self::Integer(0)
61        }
62    }
63}
64
65impl From<i32> for PlanValue {
66    fn from(value: i32) -> Self {
67        Self::Integer(value as i64)
68    }
69}
70
71// ============================================================================
72// CREATE TABLE Plan
73// ============================================================================
74
75/// Multi-column unique constraint specification.
76#[derive(Clone, Debug)]
77pub struct MultiColumnUniqueSpec {
78    /// Optional name for the unique constraint
79    pub name: Option<String>,
80    /// Column names participating in this UNIQUE constraint
81    pub columns: Vec<String>,
82}
83
84/// Plan for creating a table.
85#[derive(Clone, Debug)]
86pub struct CreateTablePlan {
87    pub name: String,
88    pub if_not_exists: bool,
89    pub or_replace: bool,
90    pub columns: Vec<PlanColumnSpec>,
91    pub source: Option<CreateTableSource>,
92    /// Optional storage namespace for the table.
93    pub namespace: Option<String>,
94    pub foreign_keys: Vec<ForeignKeySpec>,
95    pub multi_column_uniques: Vec<MultiColumnUniqueSpec>,
96}
97
98impl CreateTablePlan {
99    pub fn new(name: impl Into<String>) -> Self {
100        Self {
101            name: name.into(),
102            if_not_exists: false,
103            or_replace: false,
104            columns: Vec::new(),
105            source: None,
106            namespace: None,
107            foreign_keys: Vec::new(),
108            multi_column_uniques: Vec::new(),
109        }
110    }
111}
112
113// ============================================================================
114// DROP TABLE Plan
115// ============================================================================
116
117/// Plan for dropping a table.
118#[derive(Clone, Debug)]
119pub struct DropTablePlan {
120    pub name: String,
121    pub if_exists: bool,
122}
123
124impl DropTablePlan {
125    pub fn new(name: impl Into<String>) -> Self {
126        Self {
127            name: name.into(),
128            if_exists: false,
129        }
130    }
131
132    pub fn if_exists(mut self, if_exists: bool) -> Self {
133        self.if_exists = if_exists;
134        self
135    }
136}
137
138// ============================================================================
139// RENAME TABLE Plan
140// ============================================================================
141
142/// Plan for renaming a table.
143#[derive(Clone, Debug, PartialEq, Eq)]
144pub struct RenameTablePlan {
145    pub current_name: String,
146    pub new_name: String,
147    pub if_exists: bool,
148}
149
150impl RenameTablePlan {
151    pub fn new(current_name: impl Into<String>, new_name: impl Into<String>) -> Self {
152        Self {
153            current_name: current_name.into(),
154            new_name: new_name.into(),
155            if_exists: false,
156        }
157    }
158
159    pub fn if_exists(mut self, if_exists: bool) -> Self {
160        self.if_exists = if_exists;
161        self
162    }
163}
164
165/// Plan for dropping an index.
166#[derive(Clone, Debug, PartialEq)]
167pub struct DropIndexPlan {
168    pub name: String,
169    pub canonical_name: String,
170    pub if_exists: bool,
171}
172
173impl DropIndexPlan {
174    pub fn new(name: impl Into<String>) -> Self {
175        let display = name.into();
176        Self {
177            canonical_name: display.to_ascii_lowercase(),
178            name: display,
179            if_exists: false,
180        }
181    }
182
183    pub fn with_canonical(mut self, canonical: impl Into<String>) -> Self {
184        self.canonical_name = canonical.into();
185        self
186    }
187
188    pub fn if_exists(mut self, if_exists: bool) -> Self {
189        self.if_exists = if_exists;
190        self
191    }
192}
193
194// ============================================================================
195// ALTER TABLE Plan Structures
196// ============================================================================
197
198/// Plan for ALTER TABLE operations.
199#[derive(Clone, Debug, PartialEq)]
200pub struct AlterTablePlan {
201    pub table_name: String,
202    pub if_exists: bool,
203    pub operation: AlterTableOperation,
204}
205
206/// Specific ALTER TABLE operation to perform.
207#[derive(Clone, Debug, PartialEq)]
208pub enum AlterTableOperation {
209    /// RENAME COLUMN old_name TO new_name
210    RenameColumn {
211        old_column_name: String,
212        new_column_name: String,
213    },
214    /// ALTER COLUMN column_name SET DATA TYPE new_type
215    SetColumnDataType {
216        column_name: String,
217        new_data_type: String, // SQL type string like "INTEGER", "VARCHAR", etc.
218    },
219    /// DROP COLUMN column_name
220    DropColumn {
221        column_name: String,
222        if_exists: bool,
223        cascade: bool,
224    },
225}
226
227impl AlterTablePlan {
228    pub fn new(table_name: impl Into<String>, operation: AlterTableOperation) -> Self {
229        Self {
230            table_name: table_name.into(),
231            if_exists: false,
232            operation,
233        }
234    }
235
236    pub fn if_exists(mut self, if_exists: bool) -> Self {
237        self.if_exists = if_exists;
238        self
239    }
240}
241
242// ============================================================================
243// FOREIGN KEY Plan Structures
244// ============================================================================
245
246#[derive(Clone, Debug, PartialEq, Eq)]
247pub enum ForeignKeyAction {
248    NoAction,
249    Restrict,
250}
251
252impl Default for ForeignKeyAction {
253    fn default() -> Self {
254        Self::NoAction
255    }
256}
257
258#[derive(Clone, Debug)]
259pub struct ForeignKeySpec {
260    pub name: Option<String>,
261    pub columns: Vec<String>,
262    pub referenced_table: String,
263    pub referenced_columns: Vec<String>,
264    pub on_delete: ForeignKeyAction,
265    pub on_update: ForeignKeyAction,
266}
267
268// ============================================================================
269// CREATE INDEX Plan
270// ============================================================================
271
272/// Column specification for CREATE INDEX statements.
273#[derive(Clone, Debug, PartialEq, Eq)]
274pub struct IndexColumnPlan {
275    pub name: String,
276    pub ascending: bool,
277    pub nulls_first: bool,
278}
279
280impl IndexColumnPlan {
281    pub fn new(name: impl Into<String>) -> Self {
282        Self {
283            name: name.into(),
284            ascending: true,
285            nulls_first: false,
286        }
287    }
288
289    pub fn with_sort(mut self, ascending: bool, nulls_first: bool) -> Self {
290        self.ascending = ascending;
291        self.nulls_first = nulls_first;
292        self
293    }
294}
295
296/// Plan for creating an index on a table.
297#[derive(Clone, Debug)]
298pub struct CreateIndexPlan {
299    pub name: Option<String>,
300    pub table: String,
301    pub unique: bool,
302    pub if_not_exists: bool,
303    pub columns: Vec<IndexColumnPlan>,
304}
305
306impl CreateIndexPlan {
307    pub fn new(table: impl Into<String>) -> Self {
308        Self {
309            name: None,
310            table: table.into(),
311            unique: false,
312            if_not_exists: false,
313            columns: Vec::new(),
314        }
315    }
316
317    pub fn with_name(mut self, name: Option<String>) -> Self {
318        self.name = name;
319        self
320    }
321
322    pub fn with_unique(mut self, unique: bool) -> Self {
323        self.unique = unique;
324        self
325    }
326
327    pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self {
328        self.if_not_exists = if_not_exists;
329        self
330    }
331
332    pub fn with_columns(mut self, columns: Vec<IndexColumnPlan>) -> Self {
333        self.columns = columns;
334        self
335    }
336}
337
338/// Column specification produced by the logical planner.
339///
340/// This struct flows from the planner into the runtime/executor so callers can
341/// reason about column metadata without duplicating field definitions.
342#[derive(Clone, Debug)]
343pub struct PlanColumnSpec {
344    pub name: String,
345    pub data_type: DataType,
346    pub nullable: bool,
347    pub primary_key: bool,
348    pub unique: bool,
349    /// Optional CHECK constraint expression (SQL string).
350    /// Example: "t.t=42" for CHECK(t.t=42)
351    pub check_expr: Option<String>,
352}
353
354impl PlanColumnSpec {
355    pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
356        Self {
357            name: name.into(),
358            data_type,
359            nullable,
360            primary_key: false,
361            unique: false,
362            check_expr: None,
363        }
364    }
365
366    pub fn with_primary_key(mut self, primary_key: bool) -> Self {
367        self.primary_key = primary_key;
368        if primary_key {
369            self.unique = true;
370        }
371        self
372    }
373
374    pub fn with_unique(mut self, unique: bool) -> Self {
375        if unique {
376            self.unique = true;
377        }
378        self
379    }
380
381    pub fn with_check(mut self, check_expr: Option<String>) -> Self {
382        self.check_expr = check_expr;
383        self
384    }
385}
386
387/// Trait for types that can be converted into a [`PlanColumnSpec`].
388pub trait IntoPlanColumnSpec {
389    fn into_plan_column_spec(self) -> PlanColumnSpec;
390}
391
392/// Column nullability specification.
393#[derive(Clone, Copy, Debug, PartialEq, Eq)]
394pub enum ColumnNullability {
395    Nullable,
396    NotNull,
397}
398
399impl ColumnNullability {
400    pub fn is_nullable(self) -> bool {
401        matches!(self, ColumnNullability::Nullable)
402    }
403}
404
405/// Convenience constant for nullable columns.
406#[allow(non_upper_case_globals)]
407pub const Nullable: ColumnNullability = ColumnNullability::Nullable;
408
409/// Convenience constant for non-null columns.
410#[allow(non_upper_case_globals)]
411pub const NotNull: ColumnNullability = ColumnNullability::NotNull;
412
413impl IntoPlanColumnSpec for PlanColumnSpec {
414    fn into_plan_column_spec(self) -> PlanColumnSpec {
415        self
416    }
417}
418
419impl<T> IntoPlanColumnSpec for &T
420where
421    T: Clone + IntoPlanColumnSpec,
422{
423    fn into_plan_column_spec(self) -> PlanColumnSpec {
424        self.clone().into_plan_column_spec()
425    }
426}
427
428impl IntoPlanColumnSpec for (&str, DataType) {
429    fn into_plan_column_spec(self) -> PlanColumnSpec {
430        PlanColumnSpec::new(self.0, self.1, true)
431    }
432}
433
434impl IntoPlanColumnSpec for (&str, DataType, bool) {
435    fn into_plan_column_spec(self) -> PlanColumnSpec {
436        PlanColumnSpec::new(self.0, self.1, self.2)
437    }
438}
439
440impl IntoPlanColumnSpec for (&str, DataType, ColumnNullability) {
441    fn into_plan_column_spec(self) -> PlanColumnSpec {
442        PlanColumnSpec::new(self.0, self.1, self.2.is_nullable())
443    }
444}
445
446/// Source data for CREATE TABLE AS SELECT.
447#[derive(Clone, Debug)]
448pub enum CreateTableSource {
449    Batches {
450        schema: Arc<Schema>,
451        batches: Vec<RecordBatch>,
452    },
453    Select {
454        plan: Box<SelectPlan>,
455    },
456}
457
458// ============================================================================
459// INSERT Plan
460// ============================================================================
461
462/// Plan for inserting data into a table.
463#[derive(Clone, Debug)]
464pub struct InsertPlan {
465    pub table: String,
466    pub columns: Vec<String>,
467    pub source: InsertSource,
468}
469
470/// Source data for INSERT operations.
471#[derive(Clone, Debug)]
472pub enum InsertSource {
473    Rows(Vec<Vec<PlanValue>>),
474    Batches(Vec<RecordBatch>),
475    Select { plan: Box<SelectPlan> },
476}
477
478// ============================================================================
479// UPDATE Plan
480// ============================================================================
481
482/// Plan for updating rows in a table.
483#[derive(Clone, Debug)]
484pub struct UpdatePlan {
485    pub table: String,
486    pub assignments: Vec<ColumnAssignment>,
487    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
488}
489
490/// Value to assign in an UPDATE.
491#[derive(Clone, Debug)]
492pub enum AssignmentValue {
493    Literal(PlanValue),
494    Expression(llkv_expr::expr::ScalarExpr<String>),
495}
496
497/// Column assignment for UPDATE.
498#[derive(Clone, Debug)]
499pub struct ColumnAssignment {
500    pub column: String,
501    pub value: AssignmentValue,
502}
503
504// ============================================================================
505// DELETE Plan
506// ============================================================================
507
508/// Plan for deleting rows from a table.
509#[derive(Clone, Debug)]
510pub struct DeletePlan {
511    pub table: String,
512    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
513}
514
515// ============================================================================
516// SELECT Plan
517// ============================================================================
518
519/// Table reference in FROM clause.
520#[derive(Clone, Debug)]
521pub struct TableRef {
522    pub schema: String,
523    pub table: String,
524}
525
526impl TableRef {
527    pub fn new(schema: impl Into<String>, table: impl Into<String>) -> Self {
528        Self {
529            schema: schema.into(),
530            table: table.into(),
531        }
532    }
533
534    /// Get fully qualified name as "schema.table"
535    pub fn qualified_name(&self) -> String {
536        if self.schema.is_empty() {
537            self.table.clone()
538        } else {
539            format!("{}.{}", self.schema, self.table)
540        }
541    }
542}
543
544/// Logical query plan for SELECT operations.
545#[derive(Clone, Debug)]
546pub struct SelectPlan {
547    /// Tables to query. Empty vec means no FROM clause (e.g., SELECT 42).
548    /// Single element for simple queries, multiple for joins/cross products.
549    pub tables: Vec<TableRef>,
550    pub projections: Vec<SelectProjection>,
551    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
552    pub aggregates: Vec<AggregateExpr>,
553    pub order_by: Vec<OrderByPlan>,
554}
555
556impl SelectPlan {
557    /// Create a SelectPlan for a single table.
558    pub fn new(table: impl Into<String>) -> Self {
559        let table_name = table.into();
560        let tables = if table_name.is_empty() {
561            Vec::new()
562        } else {
563            // Parse "schema.table" or just "table"
564            let parts: Vec<&str> = table_name.split('.').collect();
565            if parts.len() >= 2 {
566                let table_part = parts[1..].join(".");
567                vec![TableRef::new(parts[0], table_part)]
568            } else {
569                vec![TableRef::new("", table_name)]
570            }
571        };
572
573        Self {
574            tables,
575            projections: Vec::new(),
576            filter: None,
577            aggregates: Vec::new(),
578            order_by: Vec::new(),
579        }
580    }
581
582    /// Create a SelectPlan with multiple tables for cross product/joins.
583    pub fn with_tables(tables: Vec<TableRef>) -> Self {
584        Self {
585            tables,
586            projections: Vec::new(),
587            filter: None,
588            aggregates: Vec::new(),
589            order_by: Vec::new(),
590        }
591    }
592
593    pub fn with_projections(mut self, projections: Vec<SelectProjection>) -> Self {
594        self.projections = projections;
595        self
596    }
597
598    pub fn with_filter(mut self, filter: Option<llkv_expr::expr::Expr<'static, String>>) -> Self {
599        self.filter = filter;
600        self
601    }
602
603    pub fn with_aggregates(mut self, aggregates: Vec<AggregateExpr>) -> Self {
604        self.aggregates = aggregates;
605        self
606    }
607
608    pub fn with_order_by(mut self, order_by: Vec<OrderByPlan>) -> Self {
609        self.order_by = order_by;
610        self
611    }
612}
613
614/// Projection specification for SELECT.
615#[derive(Clone, Debug)]
616pub enum SelectProjection {
617    AllColumns,
618    AllColumnsExcept {
619        exclude: Vec<String>,
620    },
621    Column {
622        name: String,
623        alias: Option<String>,
624    },
625    Computed {
626        expr: llkv_expr::expr::ScalarExpr<String>,
627        alias: String,
628    },
629}
630
631// ============================================================================
632// Aggregate Plans
633// ============================================================================
634
635/// Aggregate expression in SELECT.
636#[derive(Clone, Debug)]
637pub enum AggregateExpr {
638    CountStar {
639        alias: String,
640    },
641    Column {
642        column: String,
643        alias: String,
644        function: AggregateFunction,
645        distinct: bool,
646    },
647}
648
649/// Supported aggregate functions.
650#[derive(Clone, Debug)]
651pub enum AggregateFunction {
652    Count,
653    SumInt64,
654    MinInt64,
655    MaxInt64,
656    CountNulls,
657}
658
659impl AggregateExpr {
660    pub fn count_star(alias: impl Into<String>) -> Self {
661        Self::CountStar {
662            alias: alias.into(),
663        }
664    }
665
666    pub fn count_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
667        Self::Column {
668            column: column.into(),
669            alias: alias.into(),
670            function: AggregateFunction::Count,
671            distinct: false,
672        }
673    }
674
675    pub fn count_distinct_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
676        Self::Column {
677            column: column.into(),
678            alias: alias.into(),
679            function: AggregateFunction::Count,
680            distinct: true,
681        }
682    }
683
684    pub fn sum_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
685        Self::Column {
686            column: column.into(),
687            alias: alias.into(),
688            function: AggregateFunction::SumInt64,
689            distinct: false,
690        }
691    }
692
693    pub fn min_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
694        Self::Column {
695            column: column.into(),
696            alias: alias.into(),
697            function: AggregateFunction::MinInt64,
698            distinct: false,
699        }
700    }
701
702    pub fn max_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
703        Self::Column {
704            column: column.into(),
705            alias: alias.into(),
706            function: AggregateFunction::MaxInt64,
707            distinct: false,
708        }
709    }
710
711    pub fn count_nulls(column: impl Into<String>, alias: impl Into<String>) -> Self {
712        Self::Column {
713            column: column.into(),
714            alias: alias.into(),
715            function: AggregateFunction::CountNulls,
716            distinct: false,
717        }
718    }
719}
720
721/// Helper to convert an Arrow array cell into a plan-level Value.
722pub fn plan_value_from_array(array: &ArrayRef, index: usize) -> PlanResult<PlanValue> {
723    if array.is_null(index) {
724        return Ok(PlanValue::Null);
725    }
726    match array.data_type() {
727        DataType::Boolean => {
728            let values = array
729                .as_any()
730                .downcast_ref::<BooleanArray>()
731                .ok_or_else(|| {
732                    Error::InvalidArgumentError("expected Boolean array in INSERT SELECT".into())
733                })?;
734            Ok(PlanValue::Integer(if values.value(index) { 1 } else { 0 }))
735        }
736        DataType::Int64 => {
737            let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
738                Error::InvalidArgumentError("expected Int64 array in INSERT SELECT".into())
739            })?;
740            Ok(PlanValue::Integer(values.value(index)))
741        }
742        DataType::Float64 => {
743            let values = array
744                .as_any()
745                .downcast_ref::<Float64Array>()
746                .ok_or_else(|| {
747                    Error::InvalidArgumentError("expected Float64 array in INSERT SELECT".into())
748                })?;
749            Ok(PlanValue::Float(values.value(index)))
750        }
751        DataType::Utf8 => {
752            let values = array
753                .as_any()
754                .downcast_ref::<StringArray>()
755                .ok_or_else(|| {
756                    Error::InvalidArgumentError("expected Utf8 array in INSERT SELECT".into())
757                })?;
758            Ok(PlanValue::String(values.value(index).to_string()))
759        }
760        DataType::Date32 => {
761            let values = array
762                .as_any()
763                .downcast_ref::<Date32Array>()
764                .ok_or_else(|| {
765                    Error::InvalidArgumentError("expected Date32 array in INSERT SELECT".into())
766                })?;
767            Ok(PlanValue::Integer(values.value(index) as i64))
768        }
769        other => Err(Error::InvalidArgumentError(format!(
770            "unsupported data type in INSERT SELECT: {other:?}"
771        ))),
772    }
773}
774
775// ============================================================================
776// ORDER BY Plan
777// ============================================================================
778
779/// ORDER BY specification.
780#[derive(Clone, Debug)]
781pub struct OrderByPlan {
782    pub target: OrderTarget,
783    pub sort_type: OrderSortType,
784    pub ascending: bool,
785    pub nulls_first: bool,
786}
787
788/// Sort type for ORDER BY.
789#[derive(Clone, Debug)]
790pub enum OrderSortType {
791    Native,
792    CastTextToInteger,
793}
794
795/// Target column/expression for ORDER BY.
796#[derive(Clone, Debug)]
797pub enum OrderTarget {
798    Column(String),
799    Index(usize),
800    All,
801}
802
803// ============================================================================
804// Operation Enum for Transaction Replay
805// ============================================================================
806
807/// Recordable plan operation for transaction replay.
808#[derive(Clone, Debug)]
809pub enum PlanOperation {
810    CreateTable(CreateTablePlan),
811    DropTable(DropTablePlan),
812    Insert(InsertPlan),
813    Update(UpdatePlan),
814    Delete(DeletePlan),
815    Select(SelectPlan),
816}
817
818/// Top-level plan statements that can be executed against a `Session`.
819#[derive(Clone, Debug)]
820pub enum PlanStatement {
821    BeginTransaction,
822    CommitTransaction,
823    RollbackTransaction,
824    CreateTable(CreateTablePlan),
825    DropTable(DropTablePlan),
826    DropIndex(DropIndexPlan),
827    AlterTable(AlterTablePlan),
828    CreateIndex(CreateIndexPlan),
829    Insert(InsertPlan),
830    Update(UpdatePlan),
831    Delete(DeletePlan),
832    Select(SelectPlan),
833}