1use std::sync::Arc;
8
9use arrow::array::{ArrayRef, BooleanArray, Date32Array, Float64Array, Int64Array, StringArray};
10use arrow::datatypes::{DataType, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_result::Error;
13
14pub type PlanResult<T> = llkv_result::Result<T>;
16
17#[derive(Clone, Debug, PartialEq)]
22pub enum PlanValue {
23 Null,
24 Integer(i64),
25 Float(f64),
26 String(String),
27 Struct(std::collections::HashMap<String, PlanValue>),
28}
29
30impl From<&str> for PlanValue {
31 fn from(value: &str) -> Self {
32 Self::String(value.to_string())
33 }
34}
35
36impl From<String> for PlanValue {
37 fn from(value: String) -> Self {
38 Self::String(value)
39 }
40}
41
42impl From<i64> for PlanValue {
43 fn from(value: i64) -> Self {
44 Self::Integer(value)
45 }
46}
47
48impl From<f64> for PlanValue {
49 fn from(value: f64) -> Self {
50 Self::Float(value)
51 }
52}
53
54impl From<bool> for PlanValue {
55 fn from(value: bool) -> Self {
56 if value {
58 Self::Integer(1)
59 } else {
60 Self::Integer(0)
61 }
62 }
63}
64
65impl From<i32> for PlanValue {
66 fn from(value: i32) -> Self {
67 Self::Integer(value as i64)
68 }
69}
70
71#[derive(Clone, Debug)]
77pub struct MultiColumnUniqueSpec {
78 pub name: Option<String>,
80 pub columns: Vec<String>,
82}
83
84#[derive(Clone, Debug)]
86pub struct CreateTablePlan {
87 pub name: String,
88 pub if_not_exists: bool,
89 pub or_replace: bool,
90 pub columns: Vec<PlanColumnSpec>,
91 pub source: Option<CreateTableSource>,
92 pub namespace: Option<String>,
94 pub foreign_keys: Vec<ForeignKeySpec>,
95 pub multi_column_uniques: Vec<MultiColumnUniqueSpec>,
96}
97
98impl CreateTablePlan {
99 pub fn new(name: impl Into<String>) -> Self {
100 Self {
101 name: name.into(),
102 if_not_exists: false,
103 or_replace: false,
104 columns: Vec::new(),
105 source: None,
106 namespace: None,
107 foreign_keys: Vec::new(),
108 multi_column_uniques: Vec::new(),
109 }
110 }
111}
112
113#[derive(Clone, Debug)]
119pub struct DropTablePlan {
120 pub name: String,
121 pub if_exists: bool,
122}
123
124impl DropTablePlan {
125 pub fn new(name: impl Into<String>) -> Self {
126 Self {
127 name: name.into(),
128 if_exists: false,
129 }
130 }
131
132 pub fn if_exists(mut self, if_exists: bool) -> Self {
133 self.if_exists = if_exists;
134 self
135 }
136}
137
138#[derive(Clone, Debug, PartialEq, Eq)]
144pub struct RenameTablePlan {
145 pub current_name: String,
146 pub new_name: String,
147 pub if_exists: bool,
148}
149
150impl RenameTablePlan {
151 pub fn new(current_name: impl Into<String>, new_name: impl Into<String>) -> Self {
152 Self {
153 current_name: current_name.into(),
154 new_name: new_name.into(),
155 if_exists: false,
156 }
157 }
158
159 pub fn if_exists(mut self, if_exists: bool) -> Self {
160 self.if_exists = if_exists;
161 self
162 }
163}
164
165#[derive(Clone, Debug, PartialEq)]
167pub struct DropIndexPlan {
168 pub name: String,
169 pub canonical_name: String,
170 pub if_exists: bool,
171}
172
173impl DropIndexPlan {
174 pub fn new(name: impl Into<String>) -> Self {
175 let display = name.into();
176 Self {
177 canonical_name: display.to_ascii_lowercase(),
178 name: display,
179 if_exists: false,
180 }
181 }
182
183 pub fn with_canonical(mut self, canonical: impl Into<String>) -> Self {
184 self.canonical_name = canonical.into();
185 self
186 }
187
188 pub fn if_exists(mut self, if_exists: bool) -> Self {
189 self.if_exists = if_exists;
190 self
191 }
192}
193
194#[derive(Clone, Debug, PartialEq)]
200pub struct AlterTablePlan {
201 pub table_name: String,
202 pub if_exists: bool,
203 pub operation: AlterTableOperation,
204}
205
206#[derive(Clone, Debug, PartialEq)]
208pub enum AlterTableOperation {
209 RenameColumn {
211 old_column_name: String,
212 new_column_name: String,
213 },
214 SetColumnDataType {
216 column_name: String,
217 new_data_type: String, },
219 DropColumn {
221 column_name: String,
222 if_exists: bool,
223 cascade: bool,
224 },
225}
226
227impl AlterTablePlan {
228 pub fn new(table_name: impl Into<String>, operation: AlterTableOperation) -> Self {
229 Self {
230 table_name: table_name.into(),
231 if_exists: false,
232 operation,
233 }
234 }
235
236 pub fn if_exists(mut self, if_exists: bool) -> Self {
237 self.if_exists = if_exists;
238 self
239 }
240}
241
242#[derive(Clone, Debug, PartialEq, Eq)]
247pub enum ForeignKeyAction {
248 NoAction,
249 Restrict,
250}
251
252impl Default for ForeignKeyAction {
253 fn default() -> Self {
254 Self::NoAction
255 }
256}
257
258#[derive(Clone, Debug)]
259pub struct ForeignKeySpec {
260 pub name: Option<String>,
261 pub columns: Vec<String>,
262 pub referenced_table: String,
263 pub referenced_columns: Vec<String>,
264 pub on_delete: ForeignKeyAction,
265 pub on_update: ForeignKeyAction,
266}
267
268#[derive(Clone, Debug, PartialEq, Eq)]
274pub struct IndexColumnPlan {
275 pub name: String,
276 pub ascending: bool,
277 pub nulls_first: bool,
278}
279
280impl IndexColumnPlan {
281 pub fn new(name: impl Into<String>) -> Self {
282 Self {
283 name: name.into(),
284 ascending: true,
285 nulls_first: false,
286 }
287 }
288
289 pub fn with_sort(mut self, ascending: bool, nulls_first: bool) -> Self {
290 self.ascending = ascending;
291 self.nulls_first = nulls_first;
292 self
293 }
294}
295
296#[derive(Clone, Debug)]
298pub struct CreateIndexPlan {
299 pub name: Option<String>,
300 pub table: String,
301 pub unique: bool,
302 pub if_not_exists: bool,
303 pub columns: Vec<IndexColumnPlan>,
304}
305
306impl CreateIndexPlan {
307 pub fn new(table: impl Into<String>) -> Self {
308 Self {
309 name: None,
310 table: table.into(),
311 unique: false,
312 if_not_exists: false,
313 columns: Vec::new(),
314 }
315 }
316
317 pub fn with_name(mut self, name: Option<String>) -> Self {
318 self.name = name;
319 self
320 }
321
322 pub fn with_unique(mut self, unique: bool) -> Self {
323 self.unique = unique;
324 self
325 }
326
327 pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self {
328 self.if_not_exists = if_not_exists;
329 self
330 }
331
332 pub fn with_columns(mut self, columns: Vec<IndexColumnPlan>) -> Self {
333 self.columns = columns;
334 self
335 }
336}
337
338#[derive(Clone, Debug)]
343pub struct PlanColumnSpec {
344 pub name: String,
345 pub data_type: DataType,
346 pub nullable: bool,
347 pub primary_key: bool,
348 pub unique: bool,
349 pub check_expr: Option<String>,
352}
353
354impl PlanColumnSpec {
355 pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
356 Self {
357 name: name.into(),
358 data_type,
359 nullable,
360 primary_key: false,
361 unique: false,
362 check_expr: None,
363 }
364 }
365
366 pub fn with_primary_key(mut self, primary_key: bool) -> Self {
367 self.primary_key = primary_key;
368 if primary_key {
369 self.unique = true;
370 }
371 self
372 }
373
374 pub fn with_unique(mut self, unique: bool) -> Self {
375 if unique {
376 self.unique = true;
377 }
378 self
379 }
380
381 pub fn with_check(mut self, check_expr: Option<String>) -> Self {
382 self.check_expr = check_expr;
383 self
384 }
385}
386
387pub trait IntoPlanColumnSpec {
389 fn into_plan_column_spec(self) -> PlanColumnSpec;
390}
391
392#[derive(Clone, Copy, Debug, PartialEq, Eq)]
394pub enum ColumnNullability {
395 Nullable,
396 NotNull,
397}
398
399impl ColumnNullability {
400 pub fn is_nullable(self) -> bool {
401 matches!(self, ColumnNullability::Nullable)
402 }
403}
404
405#[allow(non_upper_case_globals)]
407pub const Nullable: ColumnNullability = ColumnNullability::Nullable;
408
409#[allow(non_upper_case_globals)]
411pub const NotNull: ColumnNullability = ColumnNullability::NotNull;
412
413impl IntoPlanColumnSpec for PlanColumnSpec {
414 fn into_plan_column_spec(self) -> PlanColumnSpec {
415 self
416 }
417}
418
419impl<T> IntoPlanColumnSpec for &T
420where
421 T: Clone + IntoPlanColumnSpec,
422{
423 fn into_plan_column_spec(self) -> PlanColumnSpec {
424 self.clone().into_plan_column_spec()
425 }
426}
427
428impl IntoPlanColumnSpec for (&str, DataType) {
429 fn into_plan_column_spec(self) -> PlanColumnSpec {
430 PlanColumnSpec::new(self.0, self.1, true)
431 }
432}
433
434impl IntoPlanColumnSpec for (&str, DataType, bool) {
435 fn into_plan_column_spec(self) -> PlanColumnSpec {
436 PlanColumnSpec::new(self.0, self.1, self.2)
437 }
438}
439
440impl IntoPlanColumnSpec for (&str, DataType, ColumnNullability) {
441 fn into_plan_column_spec(self) -> PlanColumnSpec {
442 PlanColumnSpec::new(self.0, self.1, self.2.is_nullable())
443 }
444}
445
446#[derive(Clone, Debug)]
448pub enum CreateTableSource {
449 Batches {
450 schema: Arc<Schema>,
451 batches: Vec<RecordBatch>,
452 },
453 Select {
454 plan: Box<SelectPlan>,
455 },
456}
457
458#[derive(Clone, Debug)]
464pub struct InsertPlan {
465 pub table: String,
466 pub columns: Vec<String>,
467 pub source: InsertSource,
468}
469
470#[derive(Clone, Debug)]
472pub enum InsertSource {
473 Rows(Vec<Vec<PlanValue>>),
474 Batches(Vec<RecordBatch>),
475 Select { plan: Box<SelectPlan> },
476}
477
478#[derive(Clone, Debug)]
484pub struct UpdatePlan {
485 pub table: String,
486 pub assignments: Vec<ColumnAssignment>,
487 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
488}
489
490#[derive(Clone, Debug)]
492pub enum AssignmentValue {
493 Literal(PlanValue),
494 Expression(llkv_expr::expr::ScalarExpr<String>),
495}
496
497#[derive(Clone, Debug)]
499pub struct ColumnAssignment {
500 pub column: String,
501 pub value: AssignmentValue,
502}
503
504#[derive(Clone, Debug)]
510pub struct DeletePlan {
511 pub table: String,
512 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
513}
514
515#[derive(Clone, Debug)]
521pub struct TruncatePlan {
522 pub table: String,
523}
524
525#[derive(Clone, Debug)]
531pub struct TableRef {
532 pub schema: String,
533 pub table: String,
534 pub alias: Option<String>,
535}
536
537impl TableRef {
538 pub fn new(schema: impl Into<String>, table: impl Into<String>) -> Self {
539 Self {
540 schema: schema.into(),
541 table: table.into(),
542 alias: None,
543 }
544 }
545
546 pub fn with_alias(
547 schema: impl Into<String>,
548 table: impl Into<String>,
549 alias: Option<String>,
550 ) -> Self {
551 Self {
552 schema: schema.into(),
553 table: table.into(),
554 alias,
555 }
556 }
557
558 pub fn display_name(&self) -> String {
560 self.alias
561 .as_ref()
562 .cloned()
563 .unwrap_or_else(|| self.qualified_name())
564 }
565
566 pub fn qualified_name(&self) -> String {
567 if self.schema.is_empty() {
568 self.table.clone()
569 } else {
570 format!("{}.{}", self.schema, self.table)
571 }
572 }
573}
574
575#[derive(Clone, Debug)]
577pub struct SelectPlan {
578 pub tables: Vec<TableRef>,
581 pub projections: Vec<SelectProjection>,
582 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
583 pub aggregates: Vec<AggregateExpr>,
584 pub order_by: Vec<OrderByPlan>,
585 pub distinct: bool,
586}
587
588impl SelectPlan {
589 pub fn new(table: impl Into<String>) -> Self {
591 let table_name = table.into();
592 let tables = if table_name.is_empty() {
593 Vec::new()
594 } else {
595 let parts: Vec<&str> = table_name.split('.').collect();
597 if parts.len() >= 2 {
598 let table_part = parts[1..].join(".");
599 vec![TableRef::new(parts[0], table_part)]
600 } else {
601 vec![TableRef::new("", table_name)]
602 }
603 };
604
605 Self {
606 tables,
607 projections: Vec::new(),
608 filter: None,
609 aggregates: Vec::new(),
610 order_by: Vec::new(),
611 distinct: false,
612 }
613 }
614
615 pub fn with_tables(tables: Vec<TableRef>) -> Self {
617 Self {
618 tables,
619 projections: Vec::new(),
620 filter: None,
621 aggregates: Vec::new(),
622 order_by: Vec::new(),
623 distinct: false,
624 }
625 }
626
627 pub fn with_projections(mut self, projections: Vec<SelectProjection>) -> Self {
628 self.projections = projections;
629 self
630 }
631
632 pub fn with_filter(mut self, filter: Option<llkv_expr::expr::Expr<'static, String>>) -> Self {
633 self.filter = filter;
634 self
635 }
636
637 pub fn with_aggregates(mut self, aggregates: Vec<AggregateExpr>) -> Self {
638 self.aggregates = aggregates;
639 self
640 }
641
642 pub fn with_order_by(mut self, order_by: Vec<OrderByPlan>) -> Self {
643 self.order_by = order_by;
644 self
645 }
646
647 pub fn with_distinct(mut self, distinct: bool) -> Self {
648 self.distinct = distinct;
649 self
650 }
651}
652
653#[derive(Clone, Debug)]
655pub enum SelectProjection {
656 AllColumns,
657 AllColumnsExcept {
658 exclude: Vec<String>,
659 },
660 Column {
661 name: String,
662 alias: Option<String>,
663 },
664 Computed {
665 expr: llkv_expr::expr::ScalarExpr<String>,
666 alias: String,
667 },
668}
669
670#[derive(Clone, Debug)]
676pub enum AggregateExpr {
677 CountStar {
678 alias: String,
679 },
680 Column {
681 column: String,
682 alias: String,
683 function: AggregateFunction,
684 distinct: bool,
685 },
686}
687
688#[derive(Clone, Debug)]
690pub enum AggregateFunction {
691 Count,
692 SumInt64,
693 MinInt64,
694 MaxInt64,
695 CountNulls,
696}
697
698impl AggregateExpr {
699 pub fn count_star(alias: impl Into<String>) -> Self {
700 Self::CountStar {
701 alias: alias.into(),
702 }
703 }
704
705 pub fn count_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
706 Self::Column {
707 column: column.into(),
708 alias: alias.into(),
709 function: AggregateFunction::Count,
710 distinct: false,
711 }
712 }
713
714 pub fn count_distinct_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
715 Self::Column {
716 column: column.into(),
717 alias: alias.into(),
718 function: AggregateFunction::Count,
719 distinct: true,
720 }
721 }
722
723 pub fn sum_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
724 Self::Column {
725 column: column.into(),
726 alias: alias.into(),
727 function: AggregateFunction::SumInt64,
728 distinct: false,
729 }
730 }
731
732 pub fn min_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
733 Self::Column {
734 column: column.into(),
735 alias: alias.into(),
736 function: AggregateFunction::MinInt64,
737 distinct: false,
738 }
739 }
740
741 pub fn max_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
742 Self::Column {
743 column: column.into(),
744 alias: alias.into(),
745 function: AggregateFunction::MaxInt64,
746 distinct: false,
747 }
748 }
749
750 pub fn count_nulls(column: impl Into<String>, alias: impl Into<String>) -> Self {
751 Self::Column {
752 column: column.into(),
753 alias: alias.into(),
754 function: AggregateFunction::CountNulls,
755 distinct: false,
756 }
757 }
758}
759
760pub fn plan_value_from_array(array: &ArrayRef, index: usize) -> PlanResult<PlanValue> {
762 if array.is_null(index) {
763 return Ok(PlanValue::Null);
764 }
765 match array.data_type() {
766 DataType::Boolean => {
767 let values = array
768 .as_any()
769 .downcast_ref::<BooleanArray>()
770 .ok_or_else(|| {
771 Error::InvalidArgumentError("expected Boolean array in INSERT SELECT".into())
772 })?;
773 Ok(PlanValue::Integer(if values.value(index) { 1 } else { 0 }))
774 }
775 DataType::Int64 => {
776 let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
777 Error::InvalidArgumentError("expected Int64 array in INSERT SELECT".into())
778 })?;
779 Ok(PlanValue::Integer(values.value(index)))
780 }
781 DataType::Float64 => {
782 let values = array
783 .as_any()
784 .downcast_ref::<Float64Array>()
785 .ok_or_else(|| {
786 Error::InvalidArgumentError("expected Float64 array in INSERT SELECT".into())
787 })?;
788 Ok(PlanValue::Float(values.value(index)))
789 }
790 DataType::Utf8 => {
791 let values = array
792 .as_any()
793 .downcast_ref::<StringArray>()
794 .ok_or_else(|| {
795 Error::InvalidArgumentError("expected Utf8 array in INSERT SELECT".into())
796 })?;
797 Ok(PlanValue::String(values.value(index).to_string()))
798 }
799 DataType::Date32 => {
800 let values = array
801 .as_any()
802 .downcast_ref::<Date32Array>()
803 .ok_or_else(|| {
804 Error::InvalidArgumentError("expected Date32 array in INSERT SELECT".into())
805 })?;
806 Ok(PlanValue::Integer(values.value(index) as i64))
807 }
808 other => Err(Error::InvalidArgumentError(format!(
809 "unsupported data type in INSERT SELECT: {other:?}"
810 ))),
811 }
812}
813
814#[derive(Clone, Debug)]
820pub struct OrderByPlan {
821 pub target: OrderTarget,
822 pub sort_type: OrderSortType,
823 pub ascending: bool,
824 pub nulls_first: bool,
825}
826
827#[derive(Clone, Debug)]
829pub enum OrderSortType {
830 Native,
831 CastTextToInteger,
832}
833
834#[derive(Clone, Debug)]
836pub enum OrderTarget {
837 Column(String),
838 Index(usize),
839 All,
840}
841
842#[derive(Clone, Debug)]
848pub enum PlanOperation {
849 CreateTable(CreateTablePlan),
850 DropTable(DropTablePlan),
851 Insert(InsertPlan),
852 Update(UpdatePlan),
853 Delete(DeletePlan),
854 Truncate(TruncatePlan),
855 Select(SelectPlan),
856}
857
858#[derive(Clone, Debug)]
860pub enum PlanStatement {
861 BeginTransaction,
862 CommitTransaction,
863 RollbackTransaction,
864 CreateTable(CreateTablePlan),
865 DropTable(DropTablePlan),
866 DropIndex(DropIndexPlan),
867 AlterTable(AlterTablePlan),
868 CreateIndex(CreateIndexPlan),
869 Insert(InsertPlan),
870 Update(UpdatePlan),
871 Delete(DeletePlan),
872 Truncate(TruncatePlan),
873 Select(SelectPlan),
874}