llkv_plan/
plans.rs

1//! Logical query plan structures for LLKV.
2//!
3//! This module defines the plan structures that represent logical query operations
4//! before they are executed. Plans are created by SQL parsers or fluent builders and
5//! consumed by execution engines.
6
7use std::sync::Arc;
8
9use arrow::array::{ArrayRef, BooleanArray, Date32Array, Float64Array, Int64Array, StringArray};
10use arrow::datatypes::{DataType, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_result::Error;
13
14/// Result type for plan operations.
15pub type PlanResult<T> = llkv_result::Result<T>;
16
17// ============================================================================
18// PlanValue Types
19// ============================================================================
20
21#[derive(Clone, Debug, PartialEq)]
22pub enum PlanValue {
23    Null,
24    Integer(i64),
25    Float(f64),
26    String(String),
27    Struct(std::collections::HashMap<String, PlanValue>),
28}
29
30impl From<&str> for PlanValue {
31    fn from(value: &str) -> Self {
32        Self::String(value.to_string())
33    }
34}
35
36impl From<String> for PlanValue {
37    fn from(value: String) -> Self {
38        Self::String(value)
39    }
40}
41
42impl From<i64> for PlanValue {
43    fn from(value: i64) -> Self {
44        Self::Integer(value)
45    }
46}
47
48impl From<f64> for PlanValue {
49    fn from(value: f64) -> Self {
50        Self::Float(value)
51    }
52}
53
54impl From<bool> for PlanValue {
55    fn from(value: bool) -> Self {
56        // Store booleans as integers for compatibility
57        if value {
58            Self::Integer(1)
59        } else {
60            Self::Integer(0)
61        }
62    }
63}
64
65impl From<i32> for PlanValue {
66    fn from(value: i32) -> Self {
67        Self::Integer(value as i64)
68    }
69}
70
71// ============================================================================
72// CREATE TABLE Plan
73// ============================================================================
74
75/// Multi-column unique constraint specification.
76#[derive(Clone, Debug)]
77pub struct MultiColumnUniqueSpec {
78    /// Optional name for the unique constraint
79    pub name: Option<String>,
80    /// Column names participating in this UNIQUE constraint
81    pub columns: Vec<String>,
82}
83
84/// Plan for creating a table.
85#[derive(Clone, Debug)]
86pub struct CreateTablePlan {
87    pub name: String,
88    pub if_not_exists: bool,
89    pub or_replace: bool,
90    pub columns: Vec<PlanColumnSpec>,
91    pub source: Option<CreateTableSource>,
92    /// Optional storage namespace for the table.
93    pub namespace: Option<String>,
94    pub foreign_keys: Vec<ForeignKeySpec>,
95    pub multi_column_uniques: Vec<MultiColumnUniqueSpec>,
96}
97
98impl CreateTablePlan {
99    pub fn new(name: impl Into<String>) -> Self {
100        Self {
101            name: name.into(),
102            if_not_exists: false,
103            or_replace: false,
104            columns: Vec::new(),
105            source: None,
106            namespace: None,
107            foreign_keys: Vec::new(),
108            multi_column_uniques: Vec::new(),
109        }
110    }
111}
112
113// ============================================================================
114// DROP TABLE Plan
115// ============================================================================
116
117/// Plan for dropping a table.
118#[derive(Clone, Debug)]
119pub struct DropTablePlan {
120    pub name: String,
121    pub if_exists: bool,
122}
123
124impl DropTablePlan {
125    pub fn new(name: impl Into<String>) -> Self {
126        Self {
127            name: name.into(),
128            if_exists: false,
129        }
130    }
131
132    pub fn if_exists(mut self, if_exists: bool) -> Self {
133        self.if_exists = if_exists;
134        self
135    }
136}
137
138// ============================================================================
139// RENAME TABLE Plan
140// ============================================================================
141
142/// Plan for renaming a table.
143#[derive(Clone, Debug, PartialEq, Eq)]
144pub struct RenameTablePlan {
145    pub current_name: String,
146    pub new_name: String,
147    pub if_exists: bool,
148}
149
150impl RenameTablePlan {
151    pub fn new(current_name: impl Into<String>, new_name: impl Into<String>) -> Self {
152        Self {
153            current_name: current_name.into(),
154            new_name: new_name.into(),
155            if_exists: false,
156        }
157    }
158
159    pub fn if_exists(mut self, if_exists: bool) -> Self {
160        self.if_exists = if_exists;
161        self
162    }
163}
164
165/// Plan for dropping an index.
166#[derive(Clone, Debug, PartialEq)]
167pub struct DropIndexPlan {
168    pub name: String,
169    pub canonical_name: String,
170    pub if_exists: bool,
171}
172
173impl DropIndexPlan {
174    pub fn new(name: impl Into<String>) -> Self {
175        let display = name.into();
176        Self {
177            canonical_name: display.to_ascii_lowercase(),
178            name: display,
179            if_exists: false,
180        }
181    }
182
183    pub fn with_canonical(mut self, canonical: impl Into<String>) -> Self {
184        self.canonical_name = canonical.into();
185        self
186    }
187
188    pub fn if_exists(mut self, if_exists: bool) -> Self {
189        self.if_exists = if_exists;
190        self
191    }
192}
193
194// ============================================================================
195// ALTER TABLE Plan Structures
196// ============================================================================
197
198/// Plan for ALTER TABLE operations.
199#[derive(Clone, Debug, PartialEq)]
200pub struct AlterTablePlan {
201    pub table_name: String,
202    pub if_exists: bool,
203    pub operation: AlterTableOperation,
204}
205
206/// Specific ALTER TABLE operation to perform.
207#[derive(Clone, Debug, PartialEq)]
208pub enum AlterTableOperation {
209    /// RENAME COLUMN old_name TO new_name
210    RenameColumn {
211        old_column_name: String,
212        new_column_name: String,
213    },
214    /// ALTER COLUMN column_name SET DATA TYPE new_type
215    SetColumnDataType {
216        column_name: String,
217        new_data_type: String, // SQL type string like "INTEGER", "VARCHAR", etc.
218    },
219    /// DROP COLUMN column_name
220    DropColumn {
221        column_name: String,
222        if_exists: bool,
223        cascade: bool,
224    },
225}
226
227impl AlterTablePlan {
228    pub fn new(table_name: impl Into<String>, operation: AlterTableOperation) -> Self {
229        Self {
230            table_name: table_name.into(),
231            if_exists: false,
232            operation,
233        }
234    }
235
236    pub fn if_exists(mut self, if_exists: bool) -> Self {
237        self.if_exists = if_exists;
238        self
239    }
240}
241
242// ============================================================================
243// FOREIGN KEY Plan Structures
244// ============================================================================
245
246#[derive(Clone, Debug, PartialEq, Eq)]
247pub enum ForeignKeyAction {
248    NoAction,
249    Restrict,
250}
251
252impl Default for ForeignKeyAction {
253    fn default() -> Self {
254        Self::NoAction
255    }
256}
257
258#[derive(Clone, Debug)]
259pub struct ForeignKeySpec {
260    pub name: Option<String>,
261    pub columns: Vec<String>,
262    pub referenced_table: String,
263    pub referenced_columns: Vec<String>,
264    pub on_delete: ForeignKeyAction,
265    pub on_update: ForeignKeyAction,
266}
267
268// ============================================================================
269// CREATE INDEX Plan
270// ============================================================================
271
272/// Column specification for CREATE INDEX statements.
273#[derive(Clone, Debug, PartialEq, Eq)]
274pub struct IndexColumnPlan {
275    pub name: String,
276    pub ascending: bool,
277    pub nulls_first: bool,
278}
279
280impl IndexColumnPlan {
281    pub fn new(name: impl Into<String>) -> Self {
282        Self {
283            name: name.into(),
284            ascending: true,
285            nulls_first: false,
286        }
287    }
288
289    pub fn with_sort(mut self, ascending: bool, nulls_first: bool) -> Self {
290        self.ascending = ascending;
291        self.nulls_first = nulls_first;
292        self
293    }
294}
295
296/// Plan for creating an index on a table.
297#[derive(Clone, Debug)]
298pub struct CreateIndexPlan {
299    pub name: Option<String>,
300    pub table: String,
301    pub unique: bool,
302    pub if_not_exists: bool,
303    pub columns: Vec<IndexColumnPlan>,
304}
305
306impl CreateIndexPlan {
307    pub fn new(table: impl Into<String>) -> Self {
308        Self {
309            name: None,
310            table: table.into(),
311            unique: false,
312            if_not_exists: false,
313            columns: Vec::new(),
314        }
315    }
316
317    pub fn with_name(mut self, name: Option<String>) -> Self {
318        self.name = name;
319        self
320    }
321
322    pub fn with_unique(mut self, unique: bool) -> Self {
323        self.unique = unique;
324        self
325    }
326
327    pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self {
328        self.if_not_exists = if_not_exists;
329        self
330    }
331
332    pub fn with_columns(mut self, columns: Vec<IndexColumnPlan>) -> Self {
333        self.columns = columns;
334        self
335    }
336}
337
338/// Column specification produced by the logical planner.
339///
340/// This struct flows from the planner into the runtime/executor so callers can
341/// reason about column metadata without duplicating field definitions.
342#[derive(Clone, Debug)]
343pub struct PlanColumnSpec {
344    pub name: String,
345    pub data_type: DataType,
346    pub nullable: bool,
347    pub primary_key: bool,
348    pub unique: bool,
349    /// Optional CHECK constraint expression (SQL string).
350    /// Example: "t.t=42" for CHECK(t.t=42)
351    pub check_expr: Option<String>,
352}
353
354impl PlanColumnSpec {
355    pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
356        Self {
357            name: name.into(),
358            data_type,
359            nullable,
360            primary_key: false,
361            unique: false,
362            check_expr: None,
363        }
364    }
365
366    pub fn with_primary_key(mut self, primary_key: bool) -> Self {
367        self.primary_key = primary_key;
368        if primary_key {
369            self.unique = true;
370        }
371        self
372    }
373
374    pub fn with_unique(mut self, unique: bool) -> Self {
375        if unique {
376            self.unique = true;
377        }
378        self
379    }
380
381    pub fn with_check(mut self, check_expr: Option<String>) -> Self {
382        self.check_expr = check_expr;
383        self
384    }
385}
386
387/// Trait for types that can be converted into a [`PlanColumnSpec`].
388pub trait IntoPlanColumnSpec {
389    fn into_plan_column_spec(self) -> PlanColumnSpec;
390}
391
392/// Column nullability specification.
393#[derive(Clone, Copy, Debug, PartialEq, Eq)]
394pub enum ColumnNullability {
395    Nullable,
396    NotNull,
397}
398
399impl ColumnNullability {
400    pub fn is_nullable(self) -> bool {
401        matches!(self, ColumnNullability::Nullable)
402    }
403}
404
405/// Convenience constant for nullable columns.
406#[allow(non_upper_case_globals)]
407pub const Nullable: ColumnNullability = ColumnNullability::Nullable;
408
409/// Convenience constant for non-null columns.
410#[allow(non_upper_case_globals)]
411pub const NotNull: ColumnNullability = ColumnNullability::NotNull;
412
413impl IntoPlanColumnSpec for PlanColumnSpec {
414    fn into_plan_column_spec(self) -> PlanColumnSpec {
415        self
416    }
417}
418
419impl<T> IntoPlanColumnSpec for &T
420where
421    T: Clone + IntoPlanColumnSpec,
422{
423    fn into_plan_column_spec(self) -> PlanColumnSpec {
424        self.clone().into_plan_column_spec()
425    }
426}
427
428impl IntoPlanColumnSpec for (&str, DataType) {
429    fn into_plan_column_spec(self) -> PlanColumnSpec {
430        PlanColumnSpec::new(self.0, self.1, true)
431    }
432}
433
434impl IntoPlanColumnSpec for (&str, DataType, bool) {
435    fn into_plan_column_spec(self) -> PlanColumnSpec {
436        PlanColumnSpec::new(self.0, self.1, self.2)
437    }
438}
439
440impl IntoPlanColumnSpec for (&str, DataType, ColumnNullability) {
441    fn into_plan_column_spec(self) -> PlanColumnSpec {
442        PlanColumnSpec::new(self.0, self.1, self.2.is_nullable())
443    }
444}
445
446/// Source data for CREATE TABLE AS SELECT.
447#[derive(Clone, Debug)]
448pub enum CreateTableSource {
449    Batches {
450        schema: Arc<Schema>,
451        batches: Vec<RecordBatch>,
452    },
453    Select {
454        plan: Box<SelectPlan>,
455    },
456}
457
458// ============================================================================
459// INSERT Plan
460// ============================================================================
461
462/// Plan for inserting data into a table.
463#[derive(Clone, Debug)]
464pub struct InsertPlan {
465    pub table: String,
466    pub columns: Vec<String>,
467    pub source: InsertSource,
468}
469
470/// Source data for INSERT operations.
471#[derive(Clone, Debug)]
472pub enum InsertSource {
473    Rows(Vec<Vec<PlanValue>>),
474    Batches(Vec<RecordBatch>),
475    Select { plan: Box<SelectPlan> },
476}
477
478// ============================================================================
479// UPDATE Plan
480// ============================================================================
481
482/// Plan for updating rows in a table.
483#[derive(Clone, Debug)]
484pub struct UpdatePlan {
485    pub table: String,
486    pub assignments: Vec<ColumnAssignment>,
487    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
488}
489
490/// Value to assign in an UPDATE.
491#[derive(Clone, Debug)]
492pub enum AssignmentValue {
493    Literal(PlanValue),
494    Expression(llkv_expr::expr::ScalarExpr<String>),
495}
496
497/// Column assignment for UPDATE.
498#[derive(Clone, Debug)]
499pub struct ColumnAssignment {
500    pub column: String,
501    pub value: AssignmentValue,
502}
503
504// ============================================================================
505// DELETE Plan
506// ============================================================================
507
508/// Plan for deleting rows from a table.
509#[derive(Clone, Debug)]
510pub struct DeletePlan {
511    pub table: String,
512    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
513}
514
515// ============================================================================
516// TRUNCATE Plan
517// ============================================================================
518
519/// Plan for TRUNCATE TABLE operation (removes all rows).
520#[derive(Clone, Debug)]
521pub struct TruncatePlan {
522    pub table: String,
523}
524
525// ============================================================================
526// SELECT Plan
527// ============================================================================
528
529/// Table reference in FROM clause.
530#[derive(Clone, Debug)]
531pub struct TableRef {
532    pub schema: String,
533    pub table: String,
534    pub alias: Option<String>,
535}
536
537impl TableRef {
538    pub fn new(schema: impl Into<String>, table: impl Into<String>) -> Self {
539        Self {
540            schema: schema.into(),
541            table: table.into(),
542            alias: None,
543        }
544    }
545
546    pub fn with_alias(
547        schema: impl Into<String>,
548        table: impl Into<String>,
549        alias: Option<String>,
550    ) -> Self {
551        Self {
552            schema: schema.into(),
553            table: table.into(),
554            alias,
555        }
556    }
557
558    /// Preferred display name for the table (alias if present).
559    pub fn display_name(&self) -> String {
560        self.alias
561            .as_ref()
562            .cloned()
563            .unwrap_or_else(|| self.qualified_name())
564    }
565
566    pub fn qualified_name(&self) -> String {
567        if self.schema.is_empty() {
568            self.table.clone()
569        } else {
570            format!("{}.{}", self.schema, self.table)
571        }
572    }
573}
574
575/// Logical query plan for SELECT operations.
576#[derive(Clone, Debug)]
577pub struct SelectPlan {
578    /// Tables to query. Empty vec means no FROM clause (e.g., SELECT 42).
579    /// Single element for simple queries, multiple for joins/cross products.
580    pub tables: Vec<TableRef>,
581    pub projections: Vec<SelectProjection>,
582    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
583    pub aggregates: Vec<AggregateExpr>,
584    pub order_by: Vec<OrderByPlan>,
585    pub distinct: bool,
586}
587
588impl SelectPlan {
589    /// Create a SelectPlan for a single table.
590    pub fn new(table: impl Into<String>) -> Self {
591        let table_name = table.into();
592        let tables = if table_name.is_empty() {
593            Vec::new()
594        } else {
595            // Parse "schema.table" or just "table"
596            let parts: Vec<&str> = table_name.split('.').collect();
597            if parts.len() >= 2 {
598                let table_part = parts[1..].join(".");
599                vec![TableRef::new(parts[0], table_part)]
600            } else {
601                vec![TableRef::new("", table_name)]
602            }
603        };
604
605        Self {
606            tables,
607            projections: Vec::new(),
608            filter: None,
609            aggregates: Vec::new(),
610            order_by: Vec::new(),
611            distinct: false,
612        }
613    }
614
615    /// Create a SelectPlan with multiple tables for cross product/joins.
616    pub fn with_tables(tables: Vec<TableRef>) -> Self {
617        Self {
618            tables,
619            projections: Vec::new(),
620            filter: None,
621            aggregates: Vec::new(),
622            order_by: Vec::new(),
623            distinct: false,
624        }
625    }
626
627    pub fn with_projections(mut self, projections: Vec<SelectProjection>) -> Self {
628        self.projections = projections;
629        self
630    }
631
632    pub fn with_filter(mut self, filter: Option<llkv_expr::expr::Expr<'static, String>>) -> Self {
633        self.filter = filter;
634        self
635    }
636
637    pub fn with_aggregates(mut self, aggregates: Vec<AggregateExpr>) -> Self {
638        self.aggregates = aggregates;
639        self
640    }
641
642    pub fn with_order_by(mut self, order_by: Vec<OrderByPlan>) -> Self {
643        self.order_by = order_by;
644        self
645    }
646
647    pub fn with_distinct(mut self, distinct: bool) -> Self {
648        self.distinct = distinct;
649        self
650    }
651}
652
653/// Projection specification for SELECT.
654#[derive(Clone, Debug)]
655pub enum SelectProjection {
656    AllColumns,
657    AllColumnsExcept {
658        exclude: Vec<String>,
659    },
660    Column {
661        name: String,
662        alias: Option<String>,
663    },
664    Computed {
665        expr: llkv_expr::expr::ScalarExpr<String>,
666        alias: String,
667    },
668}
669
670// ============================================================================
671// Aggregate Plans
672// ============================================================================
673
674/// Aggregate expression in SELECT.
675#[derive(Clone, Debug)]
676pub enum AggregateExpr {
677    CountStar {
678        alias: String,
679    },
680    Column {
681        column: String,
682        alias: String,
683        function: AggregateFunction,
684        distinct: bool,
685    },
686}
687
688/// Supported aggregate functions.
689#[derive(Clone, Debug)]
690pub enum AggregateFunction {
691    Count,
692    SumInt64,
693    MinInt64,
694    MaxInt64,
695    CountNulls,
696}
697
698impl AggregateExpr {
699    pub fn count_star(alias: impl Into<String>) -> Self {
700        Self::CountStar {
701            alias: alias.into(),
702        }
703    }
704
705    pub fn count_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
706        Self::Column {
707            column: column.into(),
708            alias: alias.into(),
709            function: AggregateFunction::Count,
710            distinct: false,
711        }
712    }
713
714    pub fn count_distinct_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
715        Self::Column {
716            column: column.into(),
717            alias: alias.into(),
718            function: AggregateFunction::Count,
719            distinct: true,
720        }
721    }
722
723    pub fn sum_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
724        Self::Column {
725            column: column.into(),
726            alias: alias.into(),
727            function: AggregateFunction::SumInt64,
728            distinct: false,
729        }
730    }
731
732    pub fn min_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
733        Self::Column {
734            column: column.into(),
735            alias: alias.into(),
736            function: AggregateFunction::MinInt64,
737            distinct: false,
738        }
739    }
740
741    pub fn max_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
742        Self::Column {
743            column: column.into(),
744            alias: alias.into(),
745            function: AggregateFunction::MaxInt64,
746            distinct: false,
747        }
748    }
749
750    pub fn count_nulls(column: impl Into<String>, alias: impl Into<String>) -> Self {
751        Self::Column {
752            column: column.into(),
753            alias: alias.into(),
754            function: AggregateFunction::CountNulls,
755            distinct: false,
756        }
757    }
758}
759
760/// Helper to convert an Arrow array cell into a plan-level Value.
761pub fn plan_value_from_array(array: &ArrayRef, index: usize) -> PlanResult<PlanValue> {
762    if array.is_null(index) {
763        return Ok(PlanValue::Null);
764    }
765    match array.data_type() {
766        DataType::Boolean => {
767            let values = array
768                .as_any()
769                .downcast_ref::<BooleanArray>()
770                .ok_or_else(|| {
771                    Error::InvalidArgumentError("expected Boolean array in INSERT SELECT".into())
772                })?;
773            Ok(PlanValue::Integer(if values.value(index) { 1 } else { 0 }))
774        }
775        DataType::Int64 => {
776            let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
777                Error::InvalidArgumentError("expected Int64 array in INSERT SELECT".into())
778            })?;
779            Ok(PlanValue::Integer(values.value(index)))
780        }
781        DataType::Float64 => {
782            let values = array
783                .as_any()
784                .downcast_ref::<Float64Array>()
785                .ok_or_else(|| {
786                    Error::InvalidArgumentError("expected Float64 array in INSERT SELECT".into())
787                })?;
788            Ok(PlanValue::Float(values.value(index)))
789        }
790        DataType::Utf8 => {
791            let values = array
792                .as_any()
793                .downcast_ref::<StringArray>()
794                .ok_or_else(|| {
795                    Error::InvalidArgumentError("expected Utf8 array in INSERT SELECT".into())
796                })?;
797            Ok(PlanValue::String(values.value(index).to_string()))
798        }
799        DataType::Date32 => {
800            let values = array
801                .as_any()
802                .downcast_ref::<Date32Array>()
803                .ok_or_else(|| {
804                    Error::InvalidArgumentError("expected Date32 array in INSERT SELECT".into())
805                })?;
806            Ok(PlanValue::Integer(values.value(index) as i64))
807        }
808        other => Err(Error::InvalidArgumentError(format!(
809            "unsupported data type in INSERT SELECT: {other:?}"
810        ))),
811    }
812}
813
814// ============================================================================
815// ORDER BY Plan
816// ============================================================================
817
818/// ORDER BY specification.
819#[derive(Clone, Debug)]
820pub struct OrderByPlan {
821    pub target: OrderTarget,
822    pub sort_type: OrderSortType,
823    pub ascending: bool,
824    pub nulls_first: bool,
825}
826
827/// Sort type for ORDER BY.
828#[derive(Clone, Debug)]
829pub enum OrderSortType {
830    Native,
831    CastTextToInteger,
832}
833
834/// Target column/expression for ORDER BY.
835#[derive(Clone, Debug)]
836pub enum OrderTarget {
837    Column(String),
838    Index(usize),
839    All,
840}
841
842// ============================================================================
843// Operation Enum for Transaction Replay
844// ============================================================================
845
846/// Recordable plan operation for transaction replay.
847#[derive(Clone, Debug)]
848pub enum PlanOperation {
849    CreateTable(CreateTablePlan),
850    DropTable(DropTablePlan),
851    Insert(InsertPlan),
852    Update(UpdatePlan),
853    Delete(DeletePlan),
854    Truncate(TruncatePlan),
855    Select(SelectPlan),
856}
857
858/// Top-level plan statements that can be executed against a `Session`.
859#[derive(Clone, Debug)]
860pub enum PlanStatement {
861    BeginTransaction,
862    CommitTransaction,
863    RollbackTransaction,
864    CreateTable(CreateTablePlan),
865    DropTable(DropTablePlan),
866    DropIndex(DropIndexPlan),
867    AlterTable(AlterTablePlan),
868    CreateIndex(CreateIndexPlan),
869    Insert(InsertPlan),
870    Update(UpdatePlan),
871    Delete(DeletePlan),
872    Truncate(TruncatePlan),
873    Select(SelectPlan),
874}