1use std::sync::Arc;
8
9use arrow::array::{ArrayRef, BooleanArray, Date32Array, Float64Array, Int64Array, StringArray};
10use arrow::datatypes::{DataType, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_result::Error;
13
14pub type PlanResult<T> = llkv_result::Result<T>;
16
17#[derive(Clone, Debug, PartialEq)]
22pub enum PlanValue {
23 Null,
24 Integer(i64),
25 Float(f64),
26 String(String),
27 Struct(std::collections::HashMap<String, PlanValue>),
28}
29
30impl From<&str> for PlanValue {
31 fn from(value: &str) -> Self {
32 Self::String(value.to_string())
33 }
34}
35
36impl From<String> for PlanValue {
37 fn from(value: String) -> Self {
38 Self::String(value)
39 }
40}
41
42impl From<i64> for PlanValue {
43 fn from(value: i64) -> Self {
44 Self::Integer(value)
45 }
46}
47
48impl From<f64> for PlanValue {
49 fn from(value: f64) -> Self {
50 Self::Float(value)
51 }
52}
53
54impl From<bool> for PlanValue {
55 fn from(value: bool) -> Self {
56 if value {
58 Self::Integer(1)
59 } else {
60 Self::Integer(0)
61 }
62 }
63}
64
65impl From<i32> for PlanValue {
66 fn from(value: i32) -> Self {
67 Self::Integer(value as i64)
68 }
69}
70
71#[derive(Clone, Debug)]
77pub struct MultiColumnUniqueSpec {
78 pub name: Option<String>,
80 pub columns: Vec<String>,
82}
83
84#[derive(Clone, Debug)]
86pub struct CreateTablePlan {
87 pub name: String,
88 pub if_not_exists: bool,
89 pub or_replace: bool,
90 pub columns: Vec<PlanColumnSpec>,
91 pub source: Option<CreateTableSource>,
92 pub namespace: Option<String>,
94 pub foreign_keys: Vec<ForeignKeySpec>,
95 pub multi_column_uniques: Vec<MultiColumnUniqueSpec>,
96}
97
98impl CreateTablePlan {
99 pub fn new(name: impl Into<String>) -> Self {
100 Self {
101 name: name.into(),
102 if_not_exists: false,
103 or_replace: false,
104 columns: Vec::new(),
105 source: None,
106 namespace: None,
107 foreign_keys: Vec::new(),
108 multi_column_uniques: Vec::new(),
109 }
110 }
111}
112
113#[derive(Clone, Debug)]
119pub struct DropTablePlan {
120 pub name: String,
121 pub if_exists: bool,
122}
123
124impl DropTablePlan {
125 pub fn new(name: impl Into<String>) -> Self {
126 Self {
127 name: name.into(),
128 if_exists: false,
129 }
130 }
131
132 pub fn if_exists(mut self, if_exists: bool) -> Self {
133 self.if_exists = if_exists;
134 self
135 }
136}
137
138#[derive(Clone, Debug, PartialEq, Eq)]
144pub struct RenameTablePlan {
145 pub current_name: String,
146 pub new_name: String,
147 pub if_exists: bool,
148}
149
150impl RenameTablePlan {
151 pub fn new(current_name: impl Into<String>, new_name: impl Into<String>) -> Self {
152 Self {
153 current_name: current_name.into(),
154 new_name: new_name.into(),
155 if_exists: false,
156 }
157 }
158
159 pub fn if_exists(mut self, if_exists: bool) -> Self {
160 self.if_exists = if_exists;
161 self
162 }
163}
164
165#[derive(Clone, Debug, PartialEq)]
167pub struct DropIndexPlan {
168 pub name: String,
169 pub canonical_name: String,
170 pub if_exists: bool,
171}
172
173impl DropIndexPlan {
174 pub fn new(name: impl Into<String>) -> Self {
175 let display = name.into();
176 Self {
177 canonical_name: display.to_ascii_lowercase(),
178 name: display,
179 if_exists: false,
180 }
181 }
182
183 pub fn with_canonical(mut self, canonical: impl Into<String>) -> Self {
184 self.canonical_name = canonical.into();
185 self
186 }
187
188 pub fn if_exists(mut self, if_exists: bool) -> Self {
189 self.if_exists = if_exists;
190 self
191 }
192}
193
194#[derive(Clone, Debug, PartialEq)]
200pub struct AlterTablePlan {
201 pub table_name: String,
202 pub if_exists: bool,
203 pub operation: AlterTableOperation,
204}
205
206#[derive(Clone, Debug, PartialEq)]
208pub enum AlterTableOperation {
209 RenameColumn {
211 old_column_name: String,
212 new_column_name: String,
213 },
214 SetColumnDataType {
216 column_name: String,
217 new_data_type: String, },
219 DropColumn {
221 column_name: String,
222 if_exists: bool,
223 cascade: bool,
224 },
225}
226
227impl AlterTablePlan {
228 pub fn new(table_name: impl Into<String>, operation: AlterTableOperation) -> Self {
229 Self {
230 table_name: table_name.into(),
231 if_exists: false,
232 operation,
233 }
234 }
235
236 pub fn if_exists(mut self, if_exists: bool) -> Self {
237 self.if_exists = if_exists;
238 self
239 }
240}
241
242#[derive(Clone, Debug, PartialEq, Eq)]
247pub enum ForeignKeyAction {
248 NoAction,
249 Restrict,
250}
251
252impl Default for ForeignKeyAction {
253 fn default() -> Self {
254 Self::NoAction
255 }
256}
257
258#[derive(Clone, Debug)]
259pub struct ForeignKeySpec {
260 pub name: Option<String>,
261 pub columns: Vec<String>,
262 pub referenced_table: String,
263 pub referenced_columns: Vec<String>,
264 pub on_delete: ForeignKeyAction,
265 pub on_update: ForeignKeyAction,
266}
267
268#[derive(Clone, Debug, PartialEq, Eq)]
274pub struct IndexColumnPlan {
275 pub name: String,
276 pub ascending: bool,
277 pub nulls_first: bool,
278}
279
280impl IndexColumnPlan {
281 pub fn new(name: impl Into<String>) -> Self {
282 Self {
283 name: name.into(),
284 ascending: true,
285 nulls_first: false,
286 }
287 }
288
289 pub fn with_sort(mut self, ascending: bool, nulls_first: bool) -> Self {
290 self.ascending = ascending;
291 self.nulls_first = nulls_first;
292 self
293 }
294}
295
296#[derive(Clone, Debug)]
298pub struct CreateIndexPlan {
299 pub name: Option<String>,
300 pub table: String,
301 pub unique: bool,
302 pub if_not_exists: bool,
303 pub columns: Vec<IndexColumnPlan>,
304}
305
306impl CreateIndexPlan {
307 pub fn new(table: impl Into<String>) -> Self {
308 Self {
309 name: None,
310 table: table.into(),
311 unique: false,
312 if_not_exists: false,
313 columns: Vec::new(),
314 }
315 }
316
317 pub fn with_name(mut self, name: Option<String>) -> Self {
318 self.name = name;
319 self
320 }
321
322 pub fn with_unique(mut self, unique: bool) -> Self {
323 self.unique = unique;
324 self
325 }
326
327 pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self {
328 self.if_not_exists = if_not_exists;
329 self
330 }
331
332 pub fn with_columns(mut self, columns: Vec<IndexColumnPlan>) -> Self {
333 self.columns = columns;
334 self
335 }
336}
337
338#[derive(Clone, Debug)]
343pub struct PlanColumnSpec {
344 pub name: String,
345 pub data_type: DataType,
346 pub nullable: bool,
347 pub primary_key: bool,
348 pub unique: bool,
349 pub check_expr: Option<String>,
352}
353
354impl PlanColumnSpec {
355 pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
356 Self {
357 name: name.into(),
358 data_type,
359 nullable,
360 primary_key: false,
361 unique: false,
362 check_expr: None,
363 }
364 }
365
366 pub fn with_primary_key(mut self, primary_key: bool) -> Self {
367 self.primary_key = primary_key;
368 if primary_key {
369 self.unique = true;
370 }
371 self
372 }
373
374 pub fn with_unique(mut self, unique: bool) -> Self {
375 if unique {
376 self.unique = true;
377 }
378 self
379 }
380
381 pub fn with_check(mut self, check_expr: Option<String>) -> Self {
382 self.check_expr = check_expr;
383 self
384 }
385}
386
387pub trait IntoPlanColumnSpec {
389 fn into_plan_column_spec(self) -> PlanColumnSpec;
390}
391
392#[derive(Clone, Copy, Debug, PartialEq, Eq)]
394pub enum ColumnNullability {
395 Nullable,
396 NotNull,
397}
398
399impl ColumnNullability {
400 pub fn is_nullable(self) -> bool {
401 matches!(self, ColumnNullability::Nullable)
402 }
403}
404
405#[allow(non_upper_case_globals)]
407pub const Nullable: ColumnNullability = ColumnNullability::Nullable;
408
409#[allow(non_upper_case_globals)]
411pub const NotNull: ColumnNullability = ColumnNullability::NotNull;
412
413impl IntoPlanColumnSpec for PlanColumnSpec {
414 fn into_plan_column_spec(self) -> PlanColumnSpec {
415 self
416 }
417}
418
419impl<T> IntoPlanColumnSpec for &T
420where
421 T: Clone + IntoPlanColumnSpec,
422{
423 fn into_plan_column_spec(self) -> PlanColumnSpec {
424 self.clone().into_plan_column_spec()
425 }
426}
427
428impl IntoPlanColumnSpec for (&str, DataType) {
429 fn into_plan_column_spec(self) -> PlanColumnSpec {
430 PlanColumnSpec::new(self.0, self.1, true)
431 }
432}
433
434impl IntoPlanColumnSpec for (&str, DataType, bool) {
435 fn into_plan_column_spec(self) -> PlanColumnSpec {
436 PlanColumnSpec::new(self.0, self.1, self.2)
437 }
438}
439
440impl IntoPlanColumnSpec for (&str, DataType, ColumnNullability) {
441 fn into_plan_column_spec(self) -> PlanColumnSpec {
442 PlanColumnSpec::new(self.0, self.1, self.2.is_nullable())
443 }
444}
445
446#[derive(Clone, Debug)]
448pub enum CreateTableSource {
449 Batches {
450 schema: Arc<Schema>,
451 batches: Vec<RecordBatch>,
452 },
453 Select {
454 plan: Box<SelectPlan>,
455 },
456}
457
458#[derive(Clone, Debug)]
464pub struct InsertPlan {
465 pub table: String,
466 pub columns: Vec<String>,
467 pub source: InsertSource,
468}
469
470#[derive(Clone, Debug)]
472pub enum InsertSource {
473 Rows(Vec<Vec<PlanValue>>),
474 Batches(Vec<RecordBatch>),
475 Select { plan: Box<SelectPlan> },
476}
477
478#[derive(Clone, Debug)]
484pub struct UpdatePlan {
485 pub table: String,
486 pub assignments: Vec<ColumnAssignment>,
487 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
488}
489
490#[derive(Clone, Debug)]
492pub enum AssignmentValue {
493 Literal(PlanValue),
494 Expression(llkv_expr::expr::ScalarExpr<String>),
495}
496
497#[derive(Clone, Debug)]
499pub struct ColumnAssignment {
500 pub column: String,
501 pub value: AssignmentValue,
502}
503
504#[derive(Clone, Debug)]
510pub struct DeletePlan {
511 pub table: String,
512 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
513}
514
515#[derive(Clone, Debug)]
521pub struct TableRef {
522 pub schema: String,
523 pub table: String,
524}
525
526impl TableRef {
527 pub fn new(schema: impl Into<String>, table: impl Into<String>) -> Self {
528 Self {
529 schema: schema.into(),
530 table: table.into(),
531 }
532 }
533
534 pub fn qualified_name(&self) -> String {
536 if self.schema.is_empty() {
537 self.table.clone()
538 } else {
539 format!("{}.{}", self.schema, self.table)
540 }
541 }
542}
543
544#[derive(Clone, Debug)]
546pub struct SelectPlan {
547 pub tables: Vec<TableRef>,
550 pub projections: Vec<SelectProjection>,
551 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
552 pub aggregates: Vec<AggregateExpr>,
553 pub order_by: Vec<OrderByPlan>,
554}
555
556impl SelectPlan {
557 pub fn new(table: impl Into<String>) -> Self {
559 let table_name = table.into();
560 let tables = if table_name.is_empty() {
561 Vec::new()
562 } else {
563 let parts: Vec<&str> = table_name.split('.').collect();
565 if parts.len() >= 2 {
566 let table_part = parts[1..].join(".");
567 vec![TableRef::new(parts[0], table_part)]
568 } else {
569 vec![TableRef::new("", table_name)]
570 }
571 };
572
573 Self {
574 tables,
575 projections: Vec::new(),
576 filter: None,
577 aggregates: Vec::new(),
578 order_by: Vec::new(),
579 }
580 }
581
582 pub fn with_tables(tables: Vec<TableRef>) -> Self {
584 Self {
585 tables,
586 projections: Vec::new(),
587 filter: None,
588 aggregates: Vec::new(),
589 order_by: Vec::new(),
590 }
591 }
592
593 pub fn with_projections(mut self, projections: Vec<SelectProjection>) -> Self {
594 self.projections = projections;
595 self
596 }
597
598 pub fn with_filter(mut self, filter: Option<llkv_expr::expr::Expr<'static, String>>) -> Self {
599 self.filter = filter;
600 self
601 }
602
603 pub fn with_aggregates(mut self, aggregates: Vec<AggregateExpr>) -> Self {
604 self.aggregates = aggregates;
605 self
606 }
607
608 pub fn with_order_by(mut self, order_by: Vec<OrderByPlan>) -> Self {
609 self.order_by = order_by;
610 self
611 }
612}
613
614#[derive(Clone, Debug)]
616pub enum SelectProjection {
617 AllColumns,
618 AllColumnsExcept {
619 exclude: Vec<String>,
620 },
621 Column {
622 name: String,
623 alias: Option<String>,
624 },
625 Computed {
626 expr: llkv_expr::expr::ScalarExpr<String>,
627 alias: String,
628 },
629}
630
631#[derive(Clone, Debug)]
637pub enum AggregateExpr {
638 CountStar {
639 alias: String,
640 },
641 Column {
642 column: String,
643 alias: String,
644 function: AggregateFunction,
645 distinct: bool,
646 },
647}
648
649#[derive(Clone, Debug)]
651pub enum AggregateFunction {
652 Count,
653 SumInt64,
654 MinInt64,
655 MaxInt64,
656 CountNulls,
657}
658
659impl AggregateExpr {
660 pub fn count_star(alias: impl Into<String>) -> Self {
661 Self::CountStar {
662 alias: alias.into(),
663 }
664 }
665
666 pub fn count_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
667 Self::Column {
668 column: column.into(),
669 alias: alias.into(),
670 function: AggregateFunction::Count,
671 distinct: false,
672 }
673 }
674
675 pub fn count_distinct_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
676 Self::Column {
677 column: column.into(),
678 alias: alias.into(),
679 function: AggregateFunction::Count,
680 distinct: true,
681 }
682 }
683
684 pub fn sum_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
685 Self::Column {
686 column: column.into(),
687 alias: alias.into(),
688 function: AggregateFunction::SumInt64,
689 distinct: false,
690 }
691 }
692
693 pub fn min_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
694 Self::Column {
695 column: column.into(),
696 alias: alias.into(),
697 function: AggregateFunction::MinInt64,
698 distinct: false,
699 }
700 }
701
702 pub fn max_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
703 Self::Column {
704 column: column.into(),
705 alias: alias.into(),
706 function: AggregateFunction::MaxInt64,
707 distinct: false,
708 }
709 }
710
711 pub fn count_nulls(column: impl Into<String>, alias: impl Into<String>) -> Self {
712 Self::Column {
713 column: column.into(),
714 alias: alias.into(),
715 function: AggregateFunction::CountNulls,
716 distinct: false,
717 }
718 }
719}
720
721pub fn plan_value_from_array(array: &ArrayRef, index: usize) -> PlanResult<PlanValue> {
723 if array.is_null(index) {
724 return Ok(PlanValue::Null);
725 }
726 match array.data_type() {
727 DataType::Boolean => {
728 let values = array
729 .as_any()
730 .downcast_ref::<BooleanArray>()
731 .ok_or_else(|| {
732 Error::InvalidArgumentError("expected Boolean array in INSERT SELECT".into())
733 })?;
734 Ok(PlanValue::Integer(if values.value(index) { 1 } else { 0 }))
735 }
736 DataType::Int64 => {
737 let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
738 Error::InvalidArgumentError("expected Int64 array in INSERT SELECT".into())
739 })?;
740 Ok(PlanValue::Integer(values.value(index)))
741 }
742 DataType::Float64 => {
743 let values = array
744 .as_any()
745 .downcast_ref::<Float64Array>()
746 .ok_or_else(|| {
747 Error::InvalidArgumentError("expected Float64 array in INSERT SELECT".into())
748 })?;
749 Ok(PlanValue::Float(values.value(index)))
750 }
751 DataType::Utf8 => {
752 let values = array
753 .as_any()
754 .downcast_ref::<StringArray>()
755 .ok_or_else(|| {
756 Error::InvalidArgumentError("expected Utf8 array in INSERT SELECT".into())
757 })?;
758 Ok(PlanValue::String(values.value(index).to_string()))
759 }
760 DataType::Date32 => {
761 let values = array
762 .as_any()
763 .downcast_ref::<Date32Array>()
764 .ok_or_else(|| {
765 Error::InvalidArgumentError("expected Date32 array in INSERT SELECT".into())
766 })?;
767 Ok(PlanValue::Integer(values.value(index) as i64))
768 }
769 other => Err(Error::InvalidArgumentError(format!(
770 "unsupported data type in INSERT SELECT: {other:?}"
771 ))),
772 }
773}
774
775#[derive(Clone, Debug)]
781pub struct OrderByPlan {
782 pub target: OrderTarget,
783 pub sort_type: OrderSortType,
784 pub ascending: bool,
785 pub nulls_first: bool,
786}
787
788#[derive(Clone, Debug)]
790pub enum OrderSortType {
791 Native,
792 CastTextToInteger,
793}
794
795#[derive(Clone, Debug)]
797pub enum OrderTarget {
798 Column(String),
799 Index(usize),
800 All,
801}
802
803#[derive(Clone, Debug)]
809pub enum PlanOperation {
810 CreateTable(CreateTablePlan),
811 DropTable(DropTablePlan),
812 Insert(InsertPlan),
813 Update(UpdatePlan),
814 Delete(DeletePlan),
815 Select(SelectPlan),
816}
817
818#[derive(Clone, Debug)]
820pub enum PlanStatement {
821 BeginTransaction,
822 CommitTransaction,
823 RollbackTransaction,
824 CreateTable(CreateTablePlan),
825 DropTable(DropTablePlan),
826 DropIndex(DropIndexPlan),
827 AlterTable(AlterTablePlan),
828 CreateIndex(CreateIndexPlan),
829 Insert(InsertPlan),
830 Update(UpdatePlan),
831 Delete(DeletePlan),
832 Select(SelectPlan),
833}