llkv_plan/
plans.rs

1//! Logical query plan structures for LLKV.
2//!
3//! This module defines the plan structures that represent logical query operations
4//! before they are executed. Plans are created by SQL parsers or fluent builders and
5//! consumed by execution engines.
6
7use std::sync::Arc;
8
9use arrow::array::{ArrayRef, Date32Array, Float64Array, Int64Array, StringArray};
10use arrow::datatypes::{DataType, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_result::Error;
13
14/// Result type for plan operations.
15pub type PlanResult<T> = llkv_result::Result<T>;
16
17// ============================================================================
18// PlanValue Types
19// ============================================================================
20
21#[derive(Clone, Debug, PartialEq)]
22pub enum PlanValue {
23    Null,
24    Integer(i64),
25    Float(f64),
26    String(String),
27}
28
29impl From<&str> for PlanValue {
30    fn from(value: &str) -> Self {
31        Self::String(value.to_string())
32    }
33}
34
35impl From<String> for PlanValue {
36    fn from(value: String) -> Self {
37        Self::String(value)
38    }
39}
40
41impl From<i64> for PlanValue {
42    fn from(value: i64) -> Self {
43        Self::Integer(value)
44    }
45}
46
47impl From<f64> for PlanValue {
48    fn from(value: f64) -> Self {
49        Self::Float(value)
50    }
51}
52
53impl From<bool> for PlanValue {
54    fn from(value: bool) -> Self {
55        // Store booleans as integers for compatibility
56        if value {
57            Self::Integer(1)
58        } else {
59            Self::Integer(0)
60        }
61    }
62}
63
64impl From<i32> for PlanValue {
65    fn from(value: i32) -> Self {
66        Self::Integer(value as i64)
67    }
68}
69
70// ============================================================================
71// CREATE TABLE Plan
72// ============================================================================
73
74/// Plan for creating a table.
75#[derive(Clone, Debug)]
76pub struct CreateTablePlan {
77    pub name: String,
78    pub if_not_exists: bool,
79    pub or_replace: bool,
80    pub columns: Vec<ColumnSpec>,
81    pub source: Option<CreateTableSource>,
82}
83
84impl CreateTablePlan {
85    pub fn new(name: impl Into<String>) -> Self {
86        Self {
87            name: name.into(),
88            if_not_exists: false,
89            or_replace: false,
90            columns: Vec::new(),
91            source: None,
92        }
93    }
94}
95
96/// Column specification for CREATE TABLE.
97#[derive(Clone, Debug)]
98pub struct ColumnSpec {
99    pub name: String,
100    pub data_type: DataType,
101    pub nullable: bool,
102    pub primary_key: bool,
103}
104
105impl ColumnSpec {
106    pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
107        Self {
108            name: name.into(),
109            data_type,
110            nullable,
111            primary_key: false,
112        }
113    }
114
115    pub fn with_primary_key(mut self, primary_key: bool) -> Self {
116        self.primary_key = primary_key;
117        self
118    }
119}
120
121/// Trait for types that can be converted into a ColumnSpec.
122pub trait IntoColumnSpec {
123    fn into_column_spec(self) -> ColumnSpec;
124}
125
126/// Column nullability specification.
127#[derive(Clone, Copy, Debug, PartialEq, Eq)]
128pub enum ColumnNullability {
129    Nullable,
130    NotNull,
131}
132
133impl ColumnNullability {
134    pub fn is_nullable(self) -> bool {
135        matches!(self, ColumnNullability::Nullable)
136    }
137}
138
139/// Convenience constant for nullable columns.
140#[allow(non_upper_case_globals)]
141pub const Nullable: ColumnNullability = ColumnNullability::Nullable;
142
143/// Convenience constant for non-null columns.
144#[allow(non_upper_case_globals)]
145pub const NotNull: ColumnNullability = ColumnNullability::NotNull;
146
147impl IntoColumnSpec for ColumnSpec {
148    fn into_column_spec(self) -> ColumnSpec {
149        self
150    }
151}
152
153impl<T> IntoColumnSpec for &T
154where
155    T: Clone + IntoColumnSpec,
156{
157    fn into_column_spec(self) -> ColumnSpec {
158        self.clone().into_column_spec()
159    }
160}
161
162impl IntoColumnSpec for (&str, DataType) {
163    fn into_column_spec(self) -> ColumnSpec {
164        ColumnSpec::new(self.0, self.1, true)
165    }
166}
167
168impl IntoColumnSpec for (&str, DataType, bool) {
169    fn into_column_spec(self) -> ColumnSpec {
170        ColumnSpec::new(self.0, self.1, self.2)
171    }
172}
173
174impl IntoColumnSpec for (&str, DataType, ColumnNullability) {
175    fn into_column_spec(self) -> ColumnSpec {
176        ColumnSpec::new(self.0, self.1, self.2.is_nullable())
177    }
178}
179
180/// Source data for CREATE TABLE AS SELECT.
181#[derive(Clone, Debug)]
182pub enum CreateTableSource {
183    Batches {
184        schema: Arc<Schema>,
185        batches: Vec<RecordBatch>,
186    },
187    Select {
188        plan: Box<SelectPlan>,
189    },
190}
191
192// ============================================================================
193// INSERT Plan
194// ============================================================================
195
196/// Plan for inserting data into a table.
197#[derive(Clone, Debug)]
198pub struct InsertPlan {
199    pub table: String,
200    pub columns: Vec<String>,
201    pub source: InsertSource,
202}
203
204/// Source data for INSERT operations.
205#[derive(Clone, Debug)]
206pub enum InsertSource {
207    Rows(Vec<Vec<PlanValue>>),
208    Batches(Vec<RecordBatch>),
209    Select { plan: Box<SelectPlan> },
210}
211
212// ============================================================================
213// UPDATE Plan
214// ============================================================================
215
216/// Plan for updating rows in a table.
217#[derive(Clone, Debug)]
218pub struct UpdatePlan {
219    pub table: String,
220    pub assignments: Vec<ColumnAssignment>,
221    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
222}
223
224/// Value to assign in an UPDATE.
225#[derive(Clone, Debug)]
226pub enum AssignmentValue {
227    Literal(PlanValue),
228    Expression(llkv_expr::expr::ScalarExpr<String>),
229}
230
231/// Column assignment for UPDATE.
232#[derive(Clone, Debug)]
233pub struct ColumnAssignment {
234    pub column: String,
235    pub value: AssignmentValue,
236}
237
238// ============================================================================
239// DELETE Plan
240// ============================================================================
241
242/// Plan for deleting rows from a table.
243#[derive(Clone, Debug)]
244pub struct DeletePlan {
245    pub table: String,
246    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
247}
248
249// ============================================================================
250// SELECT Plan
251// ============================================================================
252
253/// Logical query plan for SELECT operations.
254#[derive(Clone, Debug)]
255pub struct SelectPlan {
256    pub table: String,
257    pub projections: Vec<SelectProjection>,
258    pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
259    pub aggregates: Vec<AggregateExpr>,
260    pub order_by: Option<OrderByPlan>,
261}
262
263impl SelectPlan {
264    pub fn new(table: impl Into<String>) -> Self {
265        Self {
266            table: table.into(),
267            projections: Vec::new(),
268            filter: None,
269            aggregates: Vec::new(),
270            order_by: None,
271        }
272    }
273
274    pub fn with_projections(mut self, projections: Vec<SelectProjection>) -> Self {
275        self.projections = projections;
276        self
277    }
278
279    pub fn with_filter(mut self, filter: Option<llkv_expr::expr::Expr<'static, String>>) -> Self {
280        self.filter = filter;
281        self
282    }
283
284    pub fn with_aggregates(mut self, aggregates: Vec<AggregateExpr>) -> Self {
285        self.aggregates = aggregates;
286        self
287    }
288
289    pub fn with_order_by(mut self, order_by: Option<OrderByPlan>) -> Self {
290        self.order_by = order_by;
291        self
292    }
293}
294
295/// Projection specification for SELECT.
296#[derive(Clone, Debug)]
297pub enum SelectProjection {
298    AllColumns,
299    Column {
300        name: String,
301        alias: Option<String>,
302    },
303    Computed {
304        expr: llkv_expr::expr::ScalarExpr<String>,
305        alias: String,
306    },
307}
308
309// ============================================================================
310// Aggregate Plans
311// ============================================================================
312
313/// Aggregate expression in SELECT.
314#[derive(Clone, Debug)]
315pub enum AggregateExpr {
316    CountStar {
317        alias: String,
318    },
319    Column {
320        column: String,
321        alias: String,
322        function: AggregateFunction,
323    },
324}
325
326/// Supported aggregate functions.
327#[derive(Clone, Debug)]
328pub enum AggregateFunction {
329    Count,
330    SumInt64,
331    MinInt64,
332    MaxInt64,
333    CountNulls,
334}
335
336impl AggregateExpr {
337    pub fn count_star(alias: impl Into<String>) -> Self {
338        Self::CountStar {
339            alias: alias.into(),
340        }
341    }
342
343    pub fn count_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
344        Self::Column {
345            column: column.into(),
346            alias: alias.into(),
347            function: AggregateFunction::Count,
348        }
349    }
350
351    pub fn sum_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
352        Self::Column {
353            column: column.into(),
354            alias: alias.into(),
355            function: AggregateFunction::SumInt64,
356        }
357    }
358
359    pub fn min_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
360        Self::Column {
361            column: column.into(),
362            alias: alias.into(),
363            function: AggregateFunction::MinInt64,
364        }
365    }
366
367    pub fn max_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
368        Self::Column {
369            column: column.into(),
370            alias: alias.into(),
371            function: AggregateFunction::MaxInt64,
372        }
373    }
374
375    pub fn count_nulls(column: impl Into<String>, alias: impl Into<String>) -> Self {
376        Self::Column {
377            column: column.into(),
378            alias: alias.into(),
379            function: AggregateFunction::CountNulls,
380        }
381    }
382}
383
384/// Helper to convert an Arrow array cell into a plan-level Value.
385pub fn plan_value_from_array(array: &ArrayRef, index: usize) -> PlanResult<PlanValue> {
386    if array.is_null(index) {
387        return Ok(PlanValue::Null);
388    }
389    match array.data_type() {
390        DataType::Int64 => {
391            let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
392                Error::InvalidArgumentError("expected Int64 array in INSERT SELECT".into())
393            })?;
394            Ok(PlanValue::Integer(values.value(index)))
395        }
396        DataType::Float64 => {
397            let values = array
398                .as_any()
399                .downcast_ref::<Float64Array>()
400                .ok_or_else(|| {
401                    Error::InvalidArgumentError("expected Float64 array in INSERT SELECT".into())
402                })?;
403            Ok(PlanValue::Float(values.value(index)))
404        }
405        DataType::Utf8 => {
406            let values = array
407                .as_any()
408                .downcast_ref::<StringArray>()
409                .ok_or_else(|| {
410                    Error::InvalidArgumentError("expected Utf8 array in INSERT SELECT".into())
411                })?;
412            Ok(PlanValue::String(values.value(index).to_string()))
413        }
414        DataType::Date32 => {
415            let values = array
416                .as_any()
417                .downcast_ref::<Date32Array>()
418                .ok_or_else(|| {
419                    Error::InvalidArgumentError("expected Date32 array in INSERT SELECT".into())
420                })?;
421            Ok(PlanValue::Integer(values.value(index) as i64))
422        }
423        other => Err(Error::InvalidArgumentError(format!(
424            "unsupported data type in INSERT SELECT: {other:?}"
425        ))),
426    }
427}
428
429// ============================================================================
430// ORDER BY Plan
431// ============================================================================
432
433/// ORDER BY specification.
434#[derive(Clone, Debug)]
435pub struct OrderByPlan {
436    pub target: OrderTarget,
437    pub sort_type: OrderSortType,
438    pub ascending: bool,
439    pub nulls_first: bool,
440}
441
442/// Sort type for ORDER BY.
443#[derive(Clone, Debug)]
444pub enum OrderSortType {
445    Native,
446    CastTextToInteger,
447}
448
449/// Target column/expression for ORDER BY.
450#[derive(Clone, Debug)]
451pub enum OrderTarget {
452    Column(String),
453    Index(usize),
454}
455
456// ============================================================================
457// Operation Enum for Transaction Replay
458// ============================================================================
459
460/// Recordable plan operation for transaction replay.
461#[derive(Clone, Debug)]
462pub enum PlanOperation {
463    CreateTable(CreateTablePlan),
464    Insert(InsertPlan),
465    Update(UpdatePlan),
466    Delete(DeletePlan),
467    Select(SelectPlan),
468}
469
470/// Top-level plan statements that can be executed against a `Session`.
471#[derive(Clone, Debug)]
472pub enum PlanStatement {
473    BeginTransaction,
474    CommitTransaction,
475    RollbackTransaction,
476    CreateTable(CreateTablePlan),
477    Insert(InsertPlan),
478    Update(UpdatePlan),
479    Delete(DeletePlan),
480    Select(SelectPlan),
481}