1use std::sync::Arc;
8
9use arrow::array::{ArrayRef, BooleanArray, Date32Array, Float64Array, Int64Array, StringArray};
10use arrow::datatypes::{DataType, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_result::Error;
13
14pub type PlanResult<T> = llkv_result::Result<T>;
16
17#[derive(Clone, Debug, PartialEq)]
22pub enum PlanValue {
23 Null,
24 Integer(i64),
25 Float(f64),
26 String(String),
27 Struct(std::collections::HashMap<String, PlanValue>),
28}
29
30impl From<&str> for PlanValue {
31 fn from(value: &str) -> Self {
32 Self::String(value.to_string())
33 }
34}
35
36impl From<String> for PlanValue {
37 fn from(value: String) -> Self {
38 Self::String(value)
39 }
40}
41
42impl From<i64> for PlanValue {
43 fn from(value: i64) -> Self {
44 Self::Integer(value)
45 }
46}
47
48impl From<f64> for PlanValue {
49 fn from(value: f64) -> Self {
50 Self::Float(value)
51 }
52}
53
54impl From<bool> for PlanValue {
55 fn from(value: bool) -> Self {
56 if value {
58 Self::Integer(1)
59 } else {
60 Self::Integer(0)
61 }
62 }
63}
64
65impl From<i32> for PlanValue {
66 fn from(value: i32) -> Self {
67 Self::Integer(value as i64)
68 }
69}
70
71#[derive(Clone, Debug)]
77pub struct CreateTablePlan {
78 pub name: String,
79 pub if_not_exists: bool,
80 pub or_replace: bool,
81 pub columns: Vec<ColumnSpec>,
82 pub source: Option<CreateTableSource>,
83 pub namespace: Option<String>,
85 pub foreign_keys: Vec<ForeignKeySpec>,
86}
87
88impl CreateTablePlan {
89 pub fn new(name: impl Into<String>) -> Self {
90 Self {
91 name: name.into(),
92 if_not_exists: false,
93 or_replace: false,
94 columns: Vec::new(),
95 source: None,
96 namespace: None,
97 foreign_keys: Vec::new(),
98 }
99 }
100}
101
102#[derive(Clone, Debug, PartialEq, Eq)]
107pub enum ForeignKeyAction {
108 NoAction,
109 Restrict,
110}
111
112impl Default for ForeignKeyAction {
113 fn default() -> Self {
114 Self::NoAction
115 }
116}
117
118#[derive(Clone, Debug)]
119pub struct ForeignKeySpec {
120 pub name: Option<String>,
121 pub columns: Vec<String>,
122 pub referenced_table: String,
123 pub referenced_columns: Vec<String>,
124 pub on_delete: ForeignKeyAction,
125 pub on_update: ForeignKeyAction,
126}
127
128#[derive(Clone, Debug, PartialEq, Eq)]
134pub struct IndexColumnPlan {
135 pub name: String,
136 pub ascending: bool,
137 pub nulls_first: bool,
138}
139
140impl IndexColumnPlan {
141 pub fn new(name: impl Into<String>) -> Self {
142 Self {
143 name: name.into(),
144 ascending: true,
145 nulls_first: false,
146 }
147 }
148
149 pub fn with_sort(mut self, ascending: bool, nulls_first: bool) -> Self {
150 self.ascending = ascending;
151 self.nulls_first = nulls_first;
152 self
153 }
154}
155
156#[derive(Clone, Debug)]
158pub struct CreateIndexPlan {
159 pub name: Option<String>,
160 pub table: String,
161 pub unique: bool,
162 pub if_not_exists: bool,
163 pub columns: Vec<IndexColumnPlan>,
164}
165
166impl CreateIndexPlan {
167 pub fn new(table: impl Into<String>) -> Self {
168 Self {
169 name: None,
170 table: table.into(),
171 unique: false,
172 if_not_exists: false,
173 columns: Vec::new(),
174 }
175 }
176
177 pub fn with_name(mut self, name: Option<String>) -> Self {
178 self.name = name;
179 self
180 }
181
182 pub fn with_unique(mut self, unique: bool) -> Self {
183 self.unique = unique;
184 self
185 }
186
187 pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self {
188 self.if_not_exists = if_not_exists;
189 self
190 }
191
192 pub fn with_columns(mut self, columns: Vec<IndexColumnPlan>) -> Self {
193 self.columns = columns;
194 self
195 }
196}
197
198#[derive(Clone, Debug)]
200pub struct ColumnSpec {
201 pub name: String,
202 pub data_type: DataType,
203 pub nullable: bool,
204 pub primary_key: bool,
205 pub unique: bool,
206 pub check_expr: Option<String>,
209}
210
211impl ColumnSpec {
212 pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
213 Self {
214 name: name.into(),
215 data_type,
216 nullable,
217 primary_key: false,
218 unique: false,
219 check_expr: None,
220 }
221 }
222
223 pub fn with_primary_key(mut self, primary_key: bool) -> Self {
224 self.primary_key = primary_key;
225 if primary_key {
226 self.unique = true;
227 }
228 self
229 }
230
231 pub fn with_unique(mut self, unique: bool) -> Self {
232 if unique {
233 self.unique = true;
234 }
235 self
236 }
237
238 pub fn with_check(mut self, check_expr: Option<String>) -> Self {
239 self.check_expr = check_expr;
240 self
241 }
242}
243
244pub trait IntoColumnSpec {
246 fn into_column_spec(self) -> ColumnSpec;
247}
248
249#[derive(Clone, Copy, Debug, PartialEq, Eq)]
251pub enum ColumnNullability {
252 Nullable,
253 NotNull,
254}
255
256impl ColumnNullability {
257 pub fn is_nullable(self) -> bool {
258 matches!(self, ColumnNullability::Nullable)
259 }
260}
261
262#[allow(non_upper_case_globals)]
264pub const Nullable: ColumnNullability = ColumnNullability::Nullable;
265
266#[allow(non_upper_case_globals)]
268pub const NotNull: ColumnNullability = ColumnNullability::NotNull;
269
270impl IntoColumnSpec for ColumnSpec {
271 fn into_column_spec(self) -> ColumnSpec {
272 self
273 }
274}
275
276impl<T> IntoColumnSpec for &T
277where
278 T: Clone + IntoColumnSpec,
279{
280 fn into_column_spec(self) -> ColumnSpec {
281 self.clone().into_column_spec()
282 }
283}
284
285impl IntoColumnSpec for (&str, DataType) {
286 fn into_column_spec(self) -> ColumnSpec {
287 ColumnSpec::new(self.0, self.1, true)
288 }
289}
290
291impl IntoColumnSpec for (&str, DataType, bool) {
292 fn into_column_spec(self) -> ColumnSpec {
293 ColumnSpec::new(self.0, self.1, self.2)
294 }
295}
296
297impl IntoColumnSpec for (&str, DataType, ColumnNullability) {
298 fn into_column_spec(self) -> ColumnSpec {
299 ColumnSpec::new(self.0, self.1, self.2.is_nullable())
300 }
301}
302
303#[derive(Clone, Debug)]
305pub enum CreateTableSource {
306 Batches {
307 schema: Arc<Schema>,
308 batches: Vec<RecordBatch>,
309 },
310 Select {
311 plan: Box<SelectPlan>,
312 },
313}
314
315#[derive(Clone, Debug)]
321pub struct InsertPlan {
322 pub table: String,
323 pub columns: Vec<String>,
324 pub source: InsertSource,
325}
326
327#[derive(Clone, Debug)]
329pub enum InsertSource {
330 Rows(Vec<Vec<PlanValue>>),
331 Batches(Vec<RecordBatch>),
332 Select { plan: Box<SelectPlan> },
333}
334
335#[derive(Clone, Debug)]
341pub struct UpdatePlan {
342 pub table: String,
343 pub assignments: Vec<ColumnAssignment>,
344 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
345}
346
347#[derive(Clone, Debug)]
349pub enum AssignmentValue {
350 Literal(PlanValue),
351 Expression(llkv_expr::expr::ScalarExpr<String>),
352}
353
354#[derive(Clone, Debug)]
356pub struct ColumnAssignment {
357 pub column: String,
358 pub value: AssignmentValue,
359}
360
361#[derive(Clone, Debug)]
367pub struct DeletePlan {
368 pub table: String,
369 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
370}
371
372#[derive(Clone, Debug)]
378pub struct TableRef {
379 pub schema: String,
380 pub table: String,
381}
382
383impl TableRef {
384 pub fn new(schema: impl Into<String>, table: impl Into<String>) -> Self {
385 Self {
386 schema: schema.into(),
387 table: table.into(),
388 }
389 }
390
391 pub fn qualified_name(&self) -> String {
393 if self.schema.is_empty() {
394 self.table.clone()
395 } else {
396 format!("{}.{}", self.schema, self.table)
397 }
398 }
399}
400
401#[derive(Clone, Debug)]
403pub struct SelectPlan {
404 pub tables: Vec<TableRef>,
407 pub projections: Vec<SelectProjection>,
408 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
409 pub aggregates: Vec<AggregateExpr>,
410 pub order_by: Vec<OrderByPlan>,
411}
412
413impl SelectPlan {
414 pub fn new(table: impl Into<String>) -> Self {
416 let table_name = table.into();
417 let tables = if table_name.is_empty() {
418 Vec::new()
419 } else {
420 let parts: Vec<&str> = table_name.split('.').collect();
422 if parts.len() >= 2 {
423 let table_part = parts[1..].join(".");
424 vec![TableRef::new(parts[0], table_part)]
425 } else {
426 vec![TableRef::new("", table_name)]
427 }
428 };
429
430 Self {
431 tables,
432 projections: Vec::new(),
433 filter: None,
434 aggregates: Vec::new(),
435 order_by: Vec::new(),
436 }
437 }
438
439 pub fn with_tables(tables: Vec<TableRef>) -> Self {
441 Self {
442 tables,
443 projections: Vec::new(),
444 filter: None,
445 aggregates: Vec::new(),
446 order_by: Vec::new(),
447 }
448 }
449
450 pub fn with_projections(mut self, projections: Vec<SelectProjection>) -> Self {
451 self.projections = projections;
452 self
453 }
454
455 pub fn with_filter(mut self, filter: Option<llkv_expr::expr::Expr<'static, String>>) -> Self {
456 self.filter = filter;
457 self
458 }
459
460 pub fn with_aggregates(mut self, aggregates: Vec<AggregateExpr>) -> Self {
461 self.aggregates = aggregates;
462 self
463 }
464
465 pub fn with_order_by(mut self, order_by: Vec<OrderByPlan>) -> Self {
466 self.order_by = order_by;
467 self
468 }
469}
470
471#[derive(Clone, Debug)]
473pub enum SelectProjection {
474 AllColumns,
475 AllColumnsExcept {
476 exclude: Vec<String>,
477 },
478 Column {
479 name: String,
480 alias: Option<String>,
481 },
482 Computed {
483 expr: llkv_expr::expr::ScalarExpr<String>,
484 alias: String,
485 },
486}
487
488#[derive(Clone, Debug)]
494pub enum AggregateExpr {
495 CountStar {
496 alias: String,
497 },
498 Column {
499 column: String,
500 alias: String,
501 function: AggregateFunction,
502 distinct: bool,
503 },
504}
505
506#[derive(Clone, Debug)]
508pub enum AggregateFunction {
509 Count,
510 SumInt64,
511 MinInt64,
512 MaxInt64,
513 CountNulls,
514}
515
516impl AggregateExpr {
517 pub fn count_star(alias: impl Into<String>) -> Self {
518 Self::CountStar {
519 alias: alias.into(),
520 }
521 }
522
523 pub fn count_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
524 Self::Column {
525 column: column.into(),
526 alias: alias.into(),
527 function: AggregateFunction::Count,
528 distinct: false,
529 }
530 }
531
532 pub fn count_distinct_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
533 Self::Column {
534 column: column.into(),
535 alias: alias.into(),
536 function: AggregateFunction::Count,
537 distinct: true,
538 }
539 }
540
541 pub fn sum_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
542 Self::Column {
543 column: column.into(),
544 alias: alias.into(),
545 function: AggregateFunction::SumInt64,
546 distinct: false,
547 }
548 }
549
550 pub fn min_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
551 Self::Column {
552 column: column.into(),
553 alias: alias.into(),
554 function: AggregateFunction::MinInt64,
555 distinct: false,
556 }
557 }
558
559 pub fn max_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
560 Self::Column {
561 column: column.into(),
562 alias: alias.into(),
563 function: AggregateFunction::MaxInt64,
564 distinct: false,
565 }
566 }
567
568 pub fn count_nulls(column: impl Into<String>, alias: impl Into<String>) -> Self {
569 Self::Column {
570 column: column.into(),
571 alias: alias.into(),
572 function: AggregateFunction::CountNulls,
573 distinct: false,
574 }
575 }
576}
577
578pub fn plan_value_from_array(array: &ArrayRef, index: usize) -> PlanResult<PlanValue> {
580 if array.is_null(index) {
581 return Ok(PlanValue::Null);
582 }
583 match array.data_type() {
584 DataType::Boolean => {
585 let values = array
586 .as_any()
587 .downcast_ref::<BooleanArray>()
588 .ok_or_else(|| {
589 Error::InvalidArgumentError("expected Boolean array in INSERT SELECT".into())
590 })?;
591 Ok(PlanValue::Integer(if values.value(index) { 1 } else { 0 }))
592 }
593 DataType::Int64 => {
594 let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
595 Error::InvalidArgumentError("expected Int64 array in INSERT SELECT".into())
596 })?;
597 Ok(PlanValue::Integer(values.value(index)))
598 }
599 DataType::Float64 => {
600 let values = array
601 .as_any()
602 .downcast_ref::<Float64Array>()
603 .ok_or_else(|| {
604 Error::InvalidArgumentError("expected Float64 array in INSERT SELECT".into())
605 })?;
606 Ok(PlanValue::Float(values.value(index)))
607 }
608 DataType::Utf8 => {
609 let values = array
610 .as_any()
611 .downcast_ref::<StringArray>()
612 .ok_or_else(|| {
613 Error::InvalidArgumentError("expected Utf8 array in INSERT SELECT".into())
614 })?;
615 Ok(PlanValue::String(values.value(index).to_string()))
616 }
617 DataType::Date32 => {
618 let values = array
619 .as_any()
620 .downcast_ref::<Date32Array>()
621 .ok_or_else(|| {
622 Error::InvalidArgumentError("expected Date32 array in INSERT SELECT".into())
623 })?;
624 Ok(PlanValue::Integer(values.value(index) as i64))
625 }
626 other => Err(Error::InvalidArgumentError(format!(
627 "unsupported data type in INSERT SELECT: {other:?}"
628 ))),
629 }
630}
631
632#[derive(Clone, Debug)]
638pub struct OrderByPlan {
639 pub target: OrderTarget,
640 pub sort_type: OrderSortType,
641 pub ascending: bool,
642 pub nulls_first: bool,
643}
644
645#[derive(Clone, Debug)]
647pub enum OrderSortType {
648 Native,
649 CastTextToInteger,
650}
651
652#[derive(Clone, Debug)]
654pub enum OrderTarget {
655 Column(String),
656 Index(usize),
657 All,
658}
659
660#[derive(Clone, Debug)]
666pub enum PlanOperation {
667 CreateTable(CreateTablePlan),
668 Insert(InsertPlan),
669 Update(UpdatePlan),
670 Delete(DeletePlan),
671 Select(SelectPlan),
672}
673
674#[derive(Clone, Debug)]
676pub enum PlanStatement {
677 BeginTransaction,
678 CommitTransaction,
679 RollbackTransaction,
680 CreateTable(CreateTablePlan),
681 CreateIndex(CreateIndexPlan),
682 Insert(InsertPlan),
683 Update(UpdatePlan),
684 Delete(DeletePlan),
685 Select(SelectPlan),
686}