1pub mod aggregate_expr;
13mod error;
14pub mod knn_optimizer;
15pub mod logical_plan;
16pub mod name_resolver;
17pub mod type_checker;
18pub mod typed_expr;
19pub mod types;
20
21#[cfg(test)]
22mod planner_tests;
23
24pub use aggregate_expr::{AggregateExpr, AggregateFunction};
25pub use error::PlannerError;
26pub use knn_optimizer::{KnnPattern, SortDirection, detect_knn_pattern};
27pub use logical_plan::LogicalPlan;
28pub use name_resolver::{NameResolver, ResolvedColumn};
29pub use type_checker::TypeChecker;
30pub use typed_expr::{
31 ProjectedColumn, Projection, SortExpr, TypedAssignment, TypedExpr, TypedExprKind,
32};
33pub use types::ResolvedType;
34
35use crate::ast::ddl::{
36 ColumnConstraint, ColumnDef, CreateIndex, CreateTable, DropIndex, DropTable,
37};
38use crate::ast::dml::{Delete, Insert, LITERAL_TABLE, OrderByExpr, Select, SelectItem, Update};
39use crate::ast::expr::Literal;
40use crate::ast::{Statement, StatementKind};
41use crate::catalog::{Catalog, ColumnMetadata, IndexMetadata, TableMetadata};
42use crate::{DataSourceFormat, TableType};
43use std::collections::HashMap;
44
45pub struct Planner<'a, C: Catalog + ?Sized> {
72 catalog: &'a C,
73 name_resolver: NameResolver<'a, C>,
74 type_checker: TypeChecker<'a, C>,
75}
76
77impl<'a, C: Catalog + ?Sized> Planner<'a, C> {
78 pub fn new(catalog: &'a C) -> Self {
80 Self {
81 catalog,
82 name_resolver: NameResolver::new(catalog),
83 type_checker: TypeChecker::new(catalog),
84 }
85 }
86
87 pub fn plan(&self, stmt: &Statement) -> Result<LogicalPlan, PlannerError> {
98 match &stmt.kind {
99 StatementKind::CreateTable(ct) => self.plan_create_table(ct),
101 StatementKind::DropTable(dt) => self.plan_drop_table(dt),
102 StatementKind::CreateIndex(ci) => self.plan_create_index(ci),
103 StatementKind::DropIndex(di) => self.plan_drop_index(di),
104
105 StatementKind::Select(sel) => self.plan_select(sel),
107 StatementKind::Insert(ins) => self.plan_insert(ins),
108 StatementKind::Update(upd) => self.plan_update(upd),
109 StatementKind::Delete(del) => self.plan_delete(del),
110 }
111 }
112
113 fn plan_create_table(&self, stmt: &CreateTable) -> Result<LogicalPlan, PlannerError> {
122 if !stmt.if_not_exists && self.catalog.table_exists(&stmt.name) {
124 return Err(PlannerError::table_already_exists(&stmt.name));
125 }
126
127 let columns: Vec<ColumnMetadata> = stmt
129 .columns
130 .iter()
131 .map(|col| self.convert_column_def(col))
132 .collect();
133
134 let primary_key = Self::extract_primary_key(stmt);
136
137 let mut table = TableMetadata::new(stmt.name.clone(), columns);
140 if let Some(pk) = primary_key {
141 table = table.with_primary_key(pk);
142 }
143 table.catalog_name = "default".to_string();
144 table.namespace_name = "default".to_string();
145 table.table_type = TableType::Managed;
146 table.data_source_format = DataSourceFormat::Alopex;
147 table.properties = HashMap::new();
148
149 Ok(LogicalPlan::CreateTable {
150 table,
151 if_not_exists: stmt.if_not_exists,
152 with_options: stmt.with_options.clone(),
153 })
154 }
155
156 fn convert_column_def(&self, col: &ColumnDef) -> ColumnMetadata {
158 let data_type = ResolvedType::from_ast(&col.data_type);
159 let mut meta = ColumnMetadata::new(col.name.clone(), data_type);
160
161 for constraint in &col.constraints {
163 meta = Self::apply_column_constraint(meta, constraint);
164 }
165
166 meta
167 }
168
169 fn apply_column_constraint(
171 mut meta: ColumnMetadata,
172 constraint: &ColumnConstraint,
173 ) -> ColumnMetadata {
174 match constraint {
175 ColumnConstraint::NotNull => {
176 meta.not_null = true;
177 }
178 ColumnConstraint::Null => {
179 meta.not_null = false;
180 }
181 ColumnConstraint::PrimaryKey => {
182 meta.primary_key = true;
183 meta.not_null = true; }
185 ColumnConstraint::Unique => {
186 meta.unique = true;
187 }
188 ColumnConstraint::Default(expr) => {
189 meta.default = Some(expr.clone());
190 }
191 ColumnConstraint::WithSpan { kind, .. } => {
192 meta = Self::apply_column_constraint(meta, kind);
193 }
194 }
195 meta
196 }
197
198 fn extract_primary_key(stmt: &CreateTable) -> Option<Vec<String>> {
200 use crate::ast::ddl::TableConstraint;
201
202 if let Some(TableConstraint::PrimaryKey { columns, .. }) = stmt.constraints.first() {
206 return Some(columns.clone());
207 }
208
209 let pk_columns: Vec<String> = stmt
211 .columns
212 .iter()
213 .filter(|col| col.constraints.iter().any(Self::is_primary_key_constraint))
214 .map(|col| col.name.clone())
215 .collect();
216
217 if pk_columns.is_empty() {
218 None
219 } else {
220 Some(pk_columns)
221 }
222 }
223
224 fn is_primary_key_constraint(constraint: &ColumnConstraint) -> bool {
226 match constraint {
227 ColumnConstraint::PrimaryKey => true,
228 ColumnConstraint::WithSpan { kind, .. } => Self::is_primary_key_constraint(kind),
229 _ => false,
230 }
231 }
232
233 fn plan_drop_table(&self, stmt: &DropTable) -> Result<LogicalPlan, PlannerError> {
237 if !stmt.if_exists && !self.table_exists_in_default(&stmt.name) {
239 return Err(PlannerError::TableNotFound {
240 name: stmt.name.clone(),
241 line: stmt.span.start.line,
242 column: stmt.span.start.column,
243 });
244 }
245
246 Ok(LogicalPlan::DropTable {
247 name: stmt.name.clone(),
248 if_exists: stmt.if_exists,
249 })
250 }
251
252 fn table_exists_in_default(&self, name: &str) -> bool {
253 match self.catalog.get_table(name) {
254 Some(table) => table.catalog_name == "default" && table.namespace_name == "default",
255 None => false,
256 }
257 }
258
259 fn plan_create_index(&self, stmt: &CreateIndex) -> Result<LogicalPlan, PlannerError> {
266 if !stmt.if_not_exists && self.catalog.index_exists(&stmt.name) {
268 return Err(PlannerError::index_already_exists(&stmt.name));
269 }
270
271 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
273
274 self.name_resolver
276 .resolve_column(table, &stmt.column, stmt.span)?;
277
278 let mut index = IndexMetadata::new(
282 0,
283 stmt.name.clone(),
284 stmt.table.clone(),
285 vec![stmt.column.clone()],
286 );
287
288 if let Some(method) = stmt.method {
289 index = index.with_method(method);
290 }
291
292 let options: Vec<(String, String)> = stmt
293 .options
294 .iter()
295 .map(|opt| (opt.key.clone(), opt.value.clone()))
296 .collect();
297 if !options.is_empty() {
298 index = index.with_options(options);
299 }
300
301 Ok(LogicalPlan::CreateIndex {
302 index,
303 if_not_exists: stmt.if_not_exists,
304 })
305 }
306
307 fn plan_drop_index(&self, stmt: &DropIndex) -> Result<LogicalPlan, PlannerError> {
311 if !stmt.if_exists && !self.index_exists_in_default(&stmt.name) {
313 return Err(PlannerError::index_not_found(&stmt.name));
314 }
315
316 Ok(LogicalPlan::DropIndex {
317 name: stmt.name.clone(),
318 if_exists: stmt.if_exists,
319 })
320 }
321
322 fn index_exists_in_default(&self, name: &str) -> bool {
323 match self.catalog.get_index(name) {
324 Some(index) => index.catalog_name == "default" && index.namespace_name == "default",
325 None => false,
326 }
327 }
328
329 fn plan_select(&self, stmt: &Select) -> Result<LogicalPlan, PlannerError> {
338 let literal_table;
340 let table = if stmt.from.name == LITERAL_TABLE {
341 literal_table = TableMetadata::new(LITERAL_TABLE, Vec::new());
342 &literal_table
343 } else {
344 self.name_resolver
345 .resolve_table(&stmt.from.name, stmt.from.span)?
346 };
347
348 let projection = self.build_projection(&stmt.projection, table)?;
350
351 let mut plan = LogicalPlan::Scan {
353 table: stmt.from.name.clone(),
354 projection,
355 };
356
357 if let Some(ref selection) = stmt.selection {
359 let predicate = self.type_checker.infer_type(selection, table)?;
360
361 if predicate.resolved_type != ResolvedType::Boolean {
363 return Err(PlannerError::type_mismatch(
364 "Boolean",
365 predicate.resolved_type.to_string(),
366 selection.span,
367 ));
368 }
369
370 plan = LogicalPlan::Filter {
371 input: Box::new(plan),
372 predicate,
373 };
374 }
375
376 if !stmt.order_by.is_empty() {
378 let order_by = self.build_sort_exprs(&stmt.order_by, table)?;
379 plan = LogicalPlan::Sort {
380 input: Box::new(plan),
381 order_by,
382 };
383 }
384
385 if stmt.limit.is_some() || stmt.offset.is_some() {
387 let limit = self.extract_limit_value(&stmt.limit, stmt.span)?;
388 let offset = self.extract_limit_value(&stmt.offset, stmt.span)?;
389 plan = LogicalPlan::Limit {
390 input: Box::new(plan),
391 limit,
392 offset,
393 };
394 }
395
396 Ok(plan)
397 }
398
399 fn build_projection(
403 &self,
404 items: &[SelectItem],
405 table: &TableMetadata,
406 ) -> Result<Projection, PlannerError> {
407 if items.len() == 1 && matches!(&items[0], SelectItem::Wildcard { .. }) {
409 let columns = self.name_resolver.expand_wildcard(table);
410 return Ok(Projection::All(columns));
411 }
412
413 let mut projected_columns = Vec::new();
415 for item in items {
416 match item {
417 SelectItem::Wildcard { span } => {
418 for col in &table.columns {
420 let column_index = table.get_column_index(&col.name).unwrap();
421 let typed_expr = TypedExpr::column_ref(
422 table.name.clone(),
423 col.name.clone(),
424 column_index,
425 col.data_type.clone(),
426 *span,
427 );
428 projected_columns.push(ProjectedColumn::new(typed_expr));
429 }
430 }
431 SelectItem::Expr { expr, alias, .. } => {
432 let typed_expr = self.type_checker.infer_type(expr, table)?;
433 let projected = if let Some(alias) = alias {
434 ProjectedColumn::with_alias(typed_expr, alias.clone())
435 } else {
436 ProjectedColumn::new(typed_expr)
437 };
438 projected_columns.push(projected);
439 }
440 }
441 }
442
443 Ok(Projection::Columns(projected_columns))
444 }
445
446 fn build_sort_exprs(
448 &self,
449 order_by: &[OrderByExpr],
450 table: &TableMetadata,
451 ) -> Result<Vec<SortExpr>, PlannerError> {
452 let mut sort_exprs = Vec::new();
453
454 for order_expr in order_by {
455 let typed_expr = self.type_checker.infer_type(&order_expr.expr, table)?;
456
457 let asc = order_expr.asc.unwrap_or(true);
459
460 let nulls_first = order_expr.nulls_first.unwrap_or(false);
462
463 sort_exprs.push(SortExpr::new(typed_expr, asc, nulls_first));
464 }
465
466 Ok(sort_exprs)
467 }
468
469 fn extract_limit_value(
473 &self,
474 expr: &Option<crate::ast::expr::Expr>,
475 stmt_span: crate::ast::Span,
476 ) -> Result<Option<u64>, PlannerError> {
477 match expr {
478 None => Ok(None),
479 Some(e) => {
480 if let crate::ast::expr::ExprKind::Literal(Literal::Number(s)) = &e.kind {
482 s.parse::<u64>().map(Some).map_err(|_| {
483 PlannerError::type_mismatch("unsigned integer", s.clone(), e.span)
484 })
485 } else {
486 Err(PlannerError::unsupported_feature(
487 "non-literal LIMIT/OFFSET",
488 "v0.3.0+",
489 stmt_span,
490 ))
491 }
492 }
493 }
494 }
495
496 fn plan_insert(&self, stmt: &Insert) -> Result<LogicalPlan, PlannerError> {
501 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
503
504 let columns: Vec<String> = if let Some(ref cols) = stmt.columns {
506 for col in cols {
508 self.name_resolver.resolve_column(table, col, stmt.span)?;
509 }
510 cols.clone()
511 } else {
512 table.column_names().into_iter().map(String::from).collect()
514 };
515
516 let mut typed_values: Vec<Vec<TypedExpr>> = Vec::new();
518
519 for row in &stmt.values {
520 if row.len() != columns.len() {
522 return Err(PlannerError::column_value_count_mismatch(
523 columns.len(),
524 row.len(),
525 stmt.span,
526 ));
527 }
528
529 let typed_row = self.type_check_insert_values(row, &columns, table)?;
531 typed_values.push(typed_row);
532 }
533
534 Ok(LogicalPlan::Insert {
535 table: table.name.clone(),
536 columns,
537 values: typed_values,
538 })
539 }
540
541 fn type_check_insert_values(
543 &self,
544 values: &[crate::ast::expr::Expr],
545 columns: &[String],
546 table: &TableMetadata,
547 ) -> Result<Vec<TypedExpr>, PlannerError> {
548 let mut typed_values = Vec::new();
549
550 for (i, value) in values.iter().enumerate() {
551 let column_name = &columns[i];
552 let column_meta = table.get_column(column_name).ok_or_else(|| {
553 PlannerError::column_not_found(column_name, &table.name, value.span)
554 })?;
555
556 let typed_value = self.type_checker.infer_type(value, table)?;
558
559 if column_meta.not_null
561 && matches!(&typed_value.kind, TypedExprKind::Literal(Literal::Null))
562 {
563 return Err(PlannerError::null_constraint_violation(
564 column_name,
565 value.span,
566 ));
567 }
568
569 self.validate_type_assignment(&typed_value, &column_meta.data_type, value.span)?;
571
572 typed_values.push(typed_value);
573 }
574
575 Ok(typed_values)
576 }
577
578 fn validate_type_assignment(
580 &self,
581 value: &TypedExpr,
582 target_type: &ResolvedType,
583 span: crate::ast::Span,
584 ) -> Result<(), PlannerError> {
585 if value.resolved_type == ResolvedType::Null {
587 return Ok(());
588 }
589
590 if self.types_compatible(&value.resolved_type, target_type) {
592 return Ok(());
593 }
594
595 Err(PlannerError::type_mismatch(
596 target_type.to_string(),
597 value.resolved_type.to_string(),
598 span,
599 ))
600 }
601
602 fn types_compatible(&self, source: &ResolvedType, target: &ResolvedType) -> bool {
604 use ResolvedType::*;
605
606 if source == target {
608 return true;
609 }
610
611 match (source, target) {
613 (Integer, BigInt) | (Integer, Float) | (Integer, Double) => true,
615 (BigInt, Float) | (BigInt, Double) => true,
617 (Float, Double) => true,
619 (Vector { dimension: d1, .. }, Vector { dimension: d2, .. }) => d1 == d2,
621 _ => false,
622 }
623 }
624
625 fn plan_update(&self, stmt: &Update) -> Result<LogicalPlan, PlannerError> {
629 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
631
632 let mut typed_assignments = Vec::new();
634
635 for assignment in &stmt.assignments {
636 let column_meta =
638 self.name_resolver
639 .resolve_column(table, &assignment.column, assignment.span)?;
640 let column_index = table.get_column_index(&assignment.column).unwrap();
641
642 let typed_value = self.type_checker.infer_type(&assignment.value, table)?;
644
645 if column_meta.not_null
647 && matches!(&typed_value.kind, TypedExprKind::Literal(Literal::Null))
648 {
649 return Err(PlannerError::null_constraint_violation(
650 &assignment.column,
651 assignment.value.span,
652 ));
653 }
654
655 self.validate_type_assignment(
657 &typed_value,
658 &column_meta.data_type,
659 assignment.value.span,
660 )?;
661
662 typed_assignments.push(TypedAssignment::new(
663 assignment.column.clone(),
664 column_index,
665 typed_value,
666 ));
667 }
668
669 let filter = if let Some(ref selection) = stmt.selection {
671 let predicate = self.type_checker.infer_type(selection, table)?;
672
673 if predicate.resolved_type != ResolvedType::Boolean {
675 return Err(PlannerError::type_mismatch(
676 "Boolean",
677 predicate.resolved_type.to_string(),
678 selection.span,
679 ));
680 }
681
682 Some(predicate)
683 } else {
684 None
685 };
686
687 Ok(LogicalPlan::Update {
688 table: table.name.clone(),
689 assignments: typed_assignments,
690 filter,
691 })
692 }
693
694 fn plan_delete(&self, stmt: &Delete) -> Result<LogicalPlan, PlannerError> {
698 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
700
701 let filter = if let Some(ref selection) = stmt.selection {
703 let predicate = self.type_checker.infer_type(selection, table)?;
704
705 if predicate.resolved_type != ResolvedType::Boolean {
707 return Err(PlannerError::type_mismatch(
708 "Boolean",
709 predicate.resolved_type.to_string(),
710 selection.span,
711 ));
712 }
713
714 Some(predicate)
715 } else {
716 None
717 };
718
719 Ok(LogicalPlan::Delete {
720 table: table.name.clone(),
721 filter,
722 })
723 }
724}