llkv_plan/
plans.rs

1//! Logical query plan structures for LLKV.
2//!
3//! This module defines the plan structures that represent logical query operations
4//! before they are executed. Plans are created by SQL parsers or fluent builders and
5//! consumed by execution engines.
6
7use std::sync::Arc;
8
9use arrow::array::{ArrayRef, BooleanArray, Date32Array, Float64Array, Int64Array, StringArray};
10use arrow::datatypes::{DataType, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_result::Error;
13
14/// Result type for plan operations.
15pub type PlanResult<T> = llkv_result::Result<T>;
16
17// ============================================================================
18// PlanValue Types
19// ============================================================================
20
21#[derive(Clone, Debug, PartialEq)]
22pub enum PlanValue {
23    Null,
24    Integer(i64),
25    Float(f64),
26    String(String),
27    Struct(std::collections::HashMap<String, PlanValue>),
28}
29
30impl From<&str> for PlanValue {
31    fn from(value: &str) -> Self {
32        Self::String(value.to_string())
33    }
34}
35
36impl From<String> for PlanValue {
37    fn from(value: String) -> Self {
38        Self::String(value)
39    }
40}
41
42impl From<i64> for PlanValue {
43    fn from(value: i64) -> Self {
44        Self::Integer(value)
45    }
46}
47
48impl From<f64> for PlanValue {
49    fn from(value: f64) -> Self {
50        Self::Float(value)
51    }
52}
53
54impl From<bool> for PlanValue {
55    fn from(value: bool) -> Self {
56        // Store booleans as integers for compatibility
57        if value {
58            Self::Integer(1)
59        } else {
60            Self::Integer(0)
61        }
62    }
63}
64
65impl From<i32> for PlanValue {
66    fn from(value: i32) -> Self {
67        Self::Integer(value as i64)
68    }
69}
70
71// ============================================================================
72// CREATE TABLE Plan
73// ============================================================================
74
75/// Plan for creating a table.
76#[derive(Clone, Debug)]
77pub struct CreateTablePlan {
78    pub name: String,
79    pub if_not_exists: bool,
80    pub or_replace: bool,
81    pub columns: Vec<ColumnSpec>,
82    pub source: Option<CreateTableSource>,
83    /// Optional storage namespace for the table.
84    pub namespace: Option<String>,
85    pub foreign_keys: Vec<ForeignKeySpec>,
86}
87
88impl CreateTablePlan {
89    pub fn new(name: impl Into<String>) -> Self {
90        Self {
91            name: name.into(),
92            if_not_exists: false,
93            or_replace: false,
94            columns: Vec::new(),
95            source: None,
96            namespace: None,
97            foreign_keys: Vec::new(),
98        }
99    }
100}
101
102// ============================================================================
103// FOREIGN KEY Plan Structures
104// ============================================================================
105
106#[derive(Clone, Debug, PartialEq, Eq)]
107pub enum ForeignKeyAction {
108    NoAction,
109    Restrict,
110}
111
112impl Default for ForeignKeyAction {
113    fn default() -> Self {
114        Self::NoAction
115    }
116}
117
118#[derive(Clone, Debug)]
119pub struct ForeignKeySpec {
120    pub name: Option<String>,
121    pub columns: Vec<String>,
122    pub referenced_table: String,
123    pub referenced_columns: Vec<String>,
124    pub on_delete: ForeignKeyAction,
125    pub on_update: ForeignKeyAction,
126}
127
128// ============================================================================
129// CREATE INDEX Plan
130// ============================================================================
131
132/// Column specification for CREATE INDEX statements.
133#[derive(Clone, Debug, PartialEq, Eq)]
134pub struct IndexColumnPlan {
135    pub name: String,
136    pub ascending: bool,
137    pub nulls_first: bool,
138}
139
140impl IndexColumnPlan {
141    pub fn new(name: impl Into<String>) -> Self {
142        Self {
143            name: name.into(),
144            ascending: true,
145            nulls_first: false,
146        }
147    }
148
149    pub fn with_sort(mut self, ascending: bool, nulls_first: bool) -> Self {
150        self.ascending = ascending;
151        self.nulls_first = nulls_first;
152        self
153    }
154}
155
156/// Plan for creating an index on a table.
157#[derive(Clone, Debug)]
158pub struct CreateIndexPlan {
159    pub name: Option<String>,
160    pub table: String,
161    pub unique: bool,
162    pub if_not_exists: bool,
163    pub columns: Vec<IndexColumnPlan>,
164}
165
166impl CreateIndexPlan {
167    pub fn new(table: impl Into<String>) -> Self {
168        Self {
169            name: None,
170            table: table.into(),
171            unique: false,
172            if_not_exists: false,
173            columns: Vec::new(),
174        }
175    }
176
177    pub fn with_name(mut self, name: Option<String>) -> Self {
178        self.name = name;
179        self
180    }
181
182    pub fn with_unique(mut self, unique: bool) -> Self {
183        self.unique = unique;
184        self
185    }
186
187    pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self {
188        self.if_not_exists = if_not_exists;
189        self
190    }
191
192    pub fn with_columns(mut self, columns: Vec<IndexColumnPlan>) -> Self {
193        self.columns = columns;
194        self
195    }
196}
197
198/// Column specification for CREATE TABLE.
199#[derive(Clone, Debug)]
200pub struct ColumnSpec {
201    pub name: String,
202    pub data_type: DataType,
203    pub nullable: bool,
204    pub primary_key: bool,
205    pub unique: bool,
206    /// Optional CHECK constraint expression (SQL string).
207    /// Example: "t.t=42" for CHECK(t.t=42)
208    pub check_expr: Option<String>,
209}
210
211impl ColumnSpec {
212    pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
213        Self {
214            name: name.into(),
215            data_type,
216            nullable,
217            primary_key: false,
218            unique: false,
219            check_expr: None,
220        }
221    }
222
223    pub fn with_primary_key(mut self, primary_key: bool) -> Self {
224        self.primary_key = primary_key;
225        if primary_key {
226            self.unique = true;
227        }
228        self
229    }
230
231    pub fn with_unique(mut self, unique: bool) -> Self {
232        if unique {
233            self.unique = true;
234        }
235        self
236    }
237
238    pub fn with_check(mut self, check_expr: Option<String>) -> Self {
239        self.check_expr = check_expr;
240        self
241    }
242}
243
244/// Trait for types that can be converted into a ColumnSpec.
245pub trait IntoColumnSpec {
246    fn into_column_spec(self) -> ColumnSpec;
247}
248
249/// Column nullability specification.
250#[derive(Clone, Copy, Debug, PartialEq, Eq)]
251pub enum ColumnNullability {
252    Nullable,
253    NotNull,
254}
255
256impl ColumnNullability {
257    pub fn is_nullable(self) -> bool {
258        matches!(self, ColumnNullability::Nullable)
259    }
260}
261
262/// Convenience constant for nullable columns.
263#[allow(non_upper_case_globals)]
264pub const Nullable: ColumnNullability = ColumnNullability::Nullable;
265
266/// Convenience constant for non-null columns.
267#[allow(non_upper_case_globals)]
268pub const NotNull: ColumnNullability = ColumnNullability::NotNull;
269
270impl IntoColumnSpec for ColumnSpec {
271    fn into_column_spec(self) -> ColumnSpec {
272        self
273    }
274}
275
276impl<T> IntoColumnSpec for &T
277where
278    T: Clone + IntoColumnSpec,
279{
280    fn into_column_spec(self) -> ColumnSpec {
281        self.clone().into_column_spec()
282    }
283}
284
285impl IntoColumnSpec for (&str, DataType) {
286    fn into_column_spec(self) -> ColumnSpec {
287        ColumnSpec::new(self.0, self.1, true)
288    }
289}
290
291impl IntoColumnSpec for (&str, DataType, bool) {
292    fn into_column_spec(self) -> ColumnSpec {
293        ColumnSpec::new(self.0, self.1, self.2)
294    }
295}
296
297impl IntoColumnSpec for (&str, DataType, ColumnNullability) {
298    fn into_column_spec(self) -> ColumnSpec {
299        ColumnSpec::new(self.0, self.1, self.2.is_nullable())
300    }
301}
302
303/// Source data for CREATE TABLE AS SELECT.
304#[derive(Clone, Debug)]
305pub enum CreateTableSource {
306    Batches {
307        schema: Arc<Schema>,
308        batches: Vec<RecordBatch>,
309    },
310    Select {
311        plan: Box<SelectPlan>,
312    },
313}
314
315// ============================================================================
316// INSERT Plan
317// ============================================================================
318
319/// Plan for inserting data into a table.
320#[derive(Clone, Debug)]
321pub struct InsertPlan {
322    pub table: String,
323    pub columns: Vec<String>,
324    pub source: InsertSource,
325}
326
327/// Source data for INSERT operations.
328#[derive(Clone, Debug)]
329pub enum InsertSource {
330    Rows(Vec<Vec<PlanValue>>),
331    Batches(Vec<RecordBatch>),
332    Select { plan: Box<SelectPlan> },
333}
334
335// ============================================================================
336// UPDATE Plan
337// ============================================================================
338
339/// Plan for updating rows in a table.
340#[derive(Clone, Debug)]
341pub struct UpdatePlan {
342    pub table: String,
343    pub assignments: Vec<ColumnAssignment>,
344    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
345}
346
347/// Value to assign in an UPDATE.
348#[derive(Clone, Debug)]
349pub enum AssignmentValue {
350    Literal(PlanValue),
351    Expression(llkv_expr::expr::ScalarExpr<String>),
352}
353
354/// Column assignment for UPDATE.
355#[derive(Clone, Debug)]
356pub struct ColumnAssignment {
357    pub column: String,
358    pub value: AssignmentValue,
359}
360
361// ============================================================================
362// DELETE Plan
363// ============================================================================
364
365/// Plan for deleting rows from a table.
366#[derive(Clone, Debug)]
367pub struct DeletePlan {
368    pub table: String,
369    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
370}
371
372// ============================================================================
373// SELECT Plan
374// ============================================================================
375
376/// Table reference in FROM clause.
377#[derive(Clone, Debug)]
378pub struct TableRef {
379    pub schema: String,
380    pub table: String,
381}
382
383impl TableRef {
384    pub fn new(schema: impl Into<String>, table: impl Into<String>) -> Self {
385        Self {
386            schema: schema.into(),
387            table: table.into(),
388        }
389    }
390
391    /// Get fully qualified name as "schema.table"
392    pub fn qualified_name(&self) -> String {
393        if self.schema.is_empty() {
394            self.table.clone()
395        } else {
396            format!("{}.{}", self.schema, self.table)
397        }
398    }
399}
400
401/// Logical query plan for SELECT operations.
402#[derive(Clone, Debug)]
403pub struct SelectPlan {
404    /// Tables to query. Empty vec means no FROM clause (e.g., SELECT 42).
405    /// Single element for simple queries, multiple for joins/cross products.
406    pub tables: Vec<TableRef>,
407    pub projections: Vec<SelectProjection>,
408    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
409    pub aggregates: Vec<AggregateExpr>,
410    pub order_by: Vec<OrderByPlan>,
411}
412
413impl SelectPlan {
414    /// Create a SelectPlan for a single table.
415    pub fn new(table: impl Into<String>) -> Self {
416        let table_name = table.into();
417        let tables = if table_name.is_empty() {
418            Vec::new()
419        } else {
420            // Parse "schema.table" or just "table"
421            let parts: Vec<&str> = table_name.split('.').collect();
422            if parts.len() >= 2 {
423                let table_part = parts[1..].join(".");
424                vec![TableRef::new(parts[0], table_part)]
425            } else {
426                vec![TableRef::new("", table_name)]
427            }
428        };
429
430        Self {
431            tables,
432            projections: Vec::new(),
433            filter: None,
434            aggregates: Vec::new(),
435            order_by: Vec::new(),
436        }
437    }
438
439    /// Create a SelectPlan with multiple tables for cross product/joins.
440    pub fn with_tables(tables: Vec<TableRef>) -> Self {
441        Self {
442            tables,
443            projections: Vec::new(),
444            filter: None,
445            aggregates: Vec::new(),
446            order_by: Vec::new(),
447        }
448    }
449
450    pub fn with_projections(mut self, projections: Vec<SelectProjection>) -> Self {
451        self.projections = projections;
452        self
453    }
454
455    pub fn with_filter(mut self, filter: Option<llkv_expr::expr::Expr<'static, String>>) -> Self {
456        self.filter = filter;
457        self
458    }
459
460    pub fn with_aggregates(mut self, aggregates: Vec<AggregateExpr>) -> Self {
461        self.aggregates = aggregates;
462        self
463    }
464
465    pub fn with_order_by(mut self, order_by: Vec<OrderByPlan>) -> Self {
466        self.order_by = order_by;
467        self
468    }
469}
470
471/// Projection specification for SELECT.
472#[derive(Clone, Debug)]
473pub enum SelectProjection {
474    AllColumns,
475    AllColumnsExcept {
476        exclude: Vec<String>,
477    },
478    Column {
479        name: String,
480        alias: Option<String>,
481    },
482    Computed {
483        expr: llkv_expr::expr::ScalarExpr<String>,
484        alias: String,
485    },
486}
487
488// ============================================================================
489// Aggregate Plans
490// ============================================================================
491
492/// Aggregate expression in SELECT.
493#[derive(Clone, Debug)]
494pub enum AggregateExpr {
495    CountStar {
496        alias: String,
497    },
498    Column {
499        column: String,
500        alias: String,
501        function: AggregateFunction,
502        distinct: bool,
503    },
504}
505
506/// Supported aggregate functions.
507#[derive(Clone, Debug)]
508pub enum AggregateFunction {
509    Count,
510    SumInt64,
511    MinInt64,
512    MaxInt64,
513    CountNulls,
514}
515
516impl AggregateExpr {
517    pub fn count_star(alias: impl Into<String>) -> Self {
518        Self::CountStar {
519            alias: alias.into(),
520        }
521    }
522
523    pub fn count_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
524        Self::Column {
525            column: column.into(),
526            alias: alias.into(),
527            function: AggregateFunction::Count,
528            distinct: false,
529        }
530    }
531
532    pub fn count_distinct_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
533        Self::Column {
534            column: column.into(),
535            alias: alias.into(),
536            function: AggregateFunction::Count,
537            distinct: true,
538        }
539    }
540
541    pub fn sum_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
542        Self::Column {
543            column: column.into(),
544            alias: alias.into(),
545            function: AggregateFunction::SumInt64,
546            distinct: false,
547        }
548    }
549
550    pub fn min_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
551        Self::Column {
552            column: column.into(),
553            alias: alias.into(),
554            function: AggregateFunction::MinInt64,
555            distinct: false,
556        }
557    }
558
559    pub fn max_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
560        Self::Column {
561            column: column.into(),
562            alias: alias.into(),
563            function: AggregateFunction::MaxInt64,
564            distinct: false,
565        }
566    }
567
568    pub fn count_nulls(column: impl Into<String>, alias: impl Into<String>) -> Self {
569        Self::Column {
570            column: column.into(),
571            alias: alias.into(),
572            function: AggregateFunction::CountNulls,
573            distinct: false,
574        }
575    }
576}
577
578/// Helper to convert an Arrow array cell into a plan-level Value.
579pub fn plan_value_from_array(array: &ArrayRef, index: usize) -> PlanResult<PlanValue> {
580    if array.is_null(index) {
581        return Ok(PlanValue::Null);
582    }
583    match array.data_type() {
584        DataType::Boolean => {
585            let values = array
586                .as_any()
587                .downcast_ref::<BooleanArray>()
588                .ok_or_else(|| {
589                    Error::InvalidArgumentError("expected Boolean array in INSERT SELECT".into())
590                })?;
591            Ok(PlanValue::Integer(if values.value(index) { 1 } else { 0 }))
592        }
593        DataType::Int64 => {
594            let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
595                Error::InvalidArgumentError("expected Int64 array in INSERT SELECT".into())
596            })?;
597            Ok(PlanValue::Integer(values.value(index)))
598        }
599        DataType::Float64 => {
600            let values = array
601                .as_any()
602                .downcast_ref::<Float64Array>()
603                .ok_or_else(|| {
604                    Error::InvalidArgumentError("expected Float64 array in INSERT SELECT".into())
605                })?;
606            Ok(PlanValue::Float(values.value(index)))
607        }
608        DataType::Utf8 => {
609            let values = array
610                .as_any()
611                .downcast_ref::<StringArray>()
612                .ok_or_else(|| {
613                    Error::InvalidArgumentError("expected Utf8 array in INSERT SELECT".into())
614                })?;
615            Ok(PlanValue::String(values.value(index).to_string()))
616        }
617        DataType::Date32 => {
618            let values = array
619                .as_any()
620                .downcast_ref::<Date32Array>()
621                .ok_or_else(|| {
622                    Error::InvalidArgumentError("expected Date32 array in INSERT SELECT".into())
623                })?;
624            Ok(PlanValue::Integer(values.value(index) as i64))
625        }
626        other => Err(Error::InvalidArgumentError(format!(
627            "unsupported data type in INSERT SELECT: {other:?}"
628        ))),
629    }
630}
631
632// ============================================================================
633// ORDER BY Plan
634// ============================================================================
635
636/// ORDER BY specification.
637#[derive(Clone, Debug)]
638pub struct OrderByPlan {
639    pub target: OrderTarget,
640    pub sort_type: OrderSortType,
641    pub ascending: bool,
642    pub nulls_first: bool,
643}
644
645/// Sort type for ORDER BY.
646#[derive(Clone, Debug)]
647pub enum OrderSortType {
648    Native,
649    CastTextToInteger,
650}
651
652/// Target column/expression for ORDER BY.
653#[derive(Clone, Debug)]
654pub enum OrderTarget {
655    Column(String),
656    Index(usize),
657    All,
658}
659
660// ============================================================================
661// Operation Enum for Transaction Replay
662// ============================================================================
663
664/// Recordable plan operation for transaction replay.
665#[derive(Clone, Debug)]
666pub enum PlanOperation {
667    CreateTable(CreateTablePlan),
668    Insert(InsertPlan),
669    Update(UpdatePlan),
670    Delete(DeletePlan),
671    Select(SelectPlan),
672}
673
674/// Top-level plan statements that can be executed against a `Session`.
675#[derive(Clone, Debug)]
676pub enum PlanStatement {
677    BeginTransaction,
678    CommitTransaction,
679    RollbackTransaction,
680    CreateTable(CreateTablePlan),
681    CreateIndex(CreateIndexPlan),
682    Insert(InsertPlan),
683    Update(UpdatePlan),
684    Delete(DeletePlan),
685    Select(SelectPlan),
686}