1pub mod aggregate_expr;
13mod error;
14pub mod knn_optimizer;
15pub mod logical_plan;
16pub mod name_resolver;
17pub mod type_checker;
18pub mod typed_expr;
19pub mod types;
20
21#[cfg(test)]
22mod planner_tests;
23
24pub use aggregate_expr::{AggregateExpr, AggregateFunction};
25pub use error::PlannerError;
26pub use knn_optimizer::{KnnPattern, SortDirection, detect_knn_pattern};
27pub use logical_plan::LogicalPlan;
28pub use name_resolver::{NameResolver, ResolvedColumn};
29pub use type_checker::TypeChecker;
30pub use typed_expr::{
31 ProjectedColumn, Projection, SortExpr, TypedAssignment, TypedExpr, TypedExprKind,
32};
33pub use types::ResolvedType;
34
35use crate::ast::ddl::{
36 ColumnConstraint, ColumnDef, CreateIndex, CreateTable, DropIndex, DropTable,
37};
38use crate::ast::dml::{Delete, Insert, OrderByExpr, Select, SelectItem, Update};
39use crate::ast::expr::Literal;
40use crate::ast::{Statement, StatementKind};
41use crate::catalog::{Catalog, ColumnMetadata, IndexMetadata, TableMetadata};
42use crate::{DataSourceFormat, TableType};
43use std::collections::HashMap;
44
45pub struct Planner<'a, C: Catalog> {
72 catalog: &'a C,
73 name_resolver: NameResolver<'a, C>,
74 type_checker: TypeChecker<'a, C>,
75}
76
77impl<'a, C: Catalog> Planner<'a, C> {
78 pub fn new(catalog: &'a C) -> Self {
80 Self {
81 catalog,
82 name_resolver: NameResolver::new(catalog),
83 type_checker: TypeChecker::new(catalog),
84 }
85 }
86
87 pub fn plan(&self, stmt: &Statement) -> Result<LogicalPlan, PlannerError> {
98 match &stmt.kind {
99 StatementKind::CreateTable(ct) => self.plan_create_table(ct),
101 StatementKind::DropTable(dt) => self.plan_drop_table(dt),
102 StatementKind::CreateIndex(ci) => self.plan_create_index(ci),
103 StatementKind::DropIndex(di) => self.plan_drop_index(di),
104
105 StatementKind::Select(sel) => self.plan_select(sel),
107 StatementKind::Insert(ins) => self.plan_insert(ins),
108 StatementKind::Update(upd) => self.plan_update(upd),
109 StatementKind::Delete(del) => self.plan_delete(del),
110 }
111 }
112
113 fn plan_create_table(&self, stmt: &CreateTable) -> Result<LogicalPlan, PlannerError> {
122 if !stmt.if_not_exists && self.catalog.table_exists(&stmt.name) {
124 return Err(PlannerError::table_already_exists(&stmt.name));
125 }
126
127 let columns: Vec<ColumnMetadata> = stmt
129 .columns
130 .iter()
131 .map(|col| self.convert_column_def(col))
132 .collect();
133
134 let primary_key = Self::extract_primary_key(stmt);
136
137 let mut table = TableMetadata::new(stmt.name.clone(), columns);
140 if let Some(pk) = primary_key {
141 table = table.with_primary_key(pk);
142 }
143 table.catalog_name = "default".to_string();
144 table.namespace_name = "default".to_string();
145 table.table_type = TableType::Managed;
146 table.data_source_format = DataSourceFormat::Alopex;
147 table.properties = HashMap::new();
148
149 Ok(LogicalPlan::CreateTable {
150 table,
151 if_not_exists: stmt.if_not_exists,
152 with_options: stmt.with_options.clone(),
153 })
154 }
155
156 fn convert_column_def(&self, col: &ColumnDef) -> ColumnMetadata {
158 let data_type = ResolvedType::from_ast(&col.data_type);
159 let mut meta = ColumnMetadata::new(col.name.clone(), data_type);
160
161 for constraint in &col.constraints {
163 meta = Self::apply_column_constraint(meta, constraint);
164 }
165
166 meta
167 }
168
169 fn apply_column_constraint(
171 mut meta: ColumnMetadata,
172 constraint: &ColumnConstraint,
173 ) -> ColumnMetadata {
174 match constraint {
175 ColumnConstraint::NotNull => {
176 meta.not_null = true;
177 }
178 ColumnConstraint::Null => {
179 meta.not_null = false;
180 }
181 ColumnConstraint::PrimaryKey => {
182 meta.primary_key = true;
183 meta.not_null = true; }
185 ColumnConstraint::Unique => {
186 meta.unique = true;
187 }
188 ColumnConstraint::Default(expr) => {
189 meta.default = Some(expr.clone());
190 }
191 ColumnConstraint::WithSpan { kind, .. } => {
192 meta = Self::apply_column_constraint(meta, kind);
193 }
194 }
195 meta
196 }
197
198 fn extract_primary_key(stmt: &CreateTable) -> Option<Vec<String>> {
200 use crate::ast::ddl::TableConstraint;
201
202 if let Some(TableConstraint::PrimaryKey { columns, .. }) = stmt.constraints.first() {
206 return Some(columns.clone());
207 }
208
209 let pk_columns: Vec<String> = stmt
211 .columns
212 .iter()
213 .filter(|col| col.constraints.iter().any(Self::is_primary_key_constraint))
214 .map(|col| col.name.clone())
215 .collect();
216
217 if pk_columns.is_empty() {
218 None
219 } else {
220 Some(pk_columns)
221 }
222 }
223
224 fn is_primary_key_constraint(constraint: &ColumnConstraint) -> bool {
226 match constraint {
227 ColumnConstraint::PrimaryKey => true,
228 ColumnConstraint::WithSpan { kind, .. } => Self::is_primary_key_constraint(kind),
229 _ => false,
230 }
231 }
232
233 fn plan_drop_table(&self, stmt: &DropTable) -> Result<LogicalPlan, PlannerError> {
237 if !stmt.if_exists && !self.table_exists_in_default(&stmt.name) {
239 return Err(PlannerError::TableNotFound {
240 name: stmt.name.clone(),
241 line: stmt.span.start.line,
242 column: stmt.span.start.column,
243 });
244 }
245
246 Ok(LogicalPlan::DropTable {
247 name: stmt.name.clone(),
248 if_exists: stmt.if_exists,
249 })
250 }
251
252 fn table_exists_in_default(&self, name: &str) -> bool {
253 match self.catalog.get_table(name) {
254 Some(table) => table.catalog_name == "default" && table.namespace_name == "default",
255 None => false,
256 }
257 }
258
259 fn plan_create_index(&self, stmt: &CreateIndex) -> Result<LogicalPlan, PlannerError> {
266 if !stmt.if_not_exists && self.catalog.index_exists(&stmt.name) {
268 return Err(PlannerError::index_already_exists(&stmt.name));
269 }
270
271 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
273
274 self.name_resolver
276 .resolve_column(table, &stmt.column, stmt.span)?;
277
278 let mut index = IndexMetadata::new(
282 0,
283 stmt.name.clone(),
284 stmt.table.clone(),
285 vec![stmt.column.clone()],
286 );
287
288 if let Some(method) = stmt.method {
289 index = index.with_method(method);
290 }
291
292 let options: Vec<(String, String)> = stmt
293 .options
294 .iter()
295 .map(|opt| (opt.key.clone(), opt.value.clone()))
296 .collect();
297 if !options.is_empty() {
298 index = index.with_options(options);
299 }
300
301 Ok(LogicalPlan::CreateIndex {
302 index,
303 if_not_exists: stmt.if_not_exists,
304 })
305 }
306
307 fn plan_drop_index(&self, stmt: &DropIndex) -> Result<LogicalPlan, PlannerError> {
311 if !stmt.if_exists && !self.index_exists_in_default(&stmt.name) {
313 return Err(PlannerError::index_not_found(&stmt.name));
314 }
315
316 Ok(LogicalPlan::DropIndex {
317 name: stmt.name.clone(),
318 if_exists: stmt.if_exists,
319 })
320 }
321
322 fn index_exists_in_default(&self, name: &str) -> bool {
323 match self.catalog.get_index(name) {
324 Some(index) => index.catalog_name == "default" && index.namespace_name == "default",
325 None => false,
326 }
327 }
328
329 fn plan_select(&self, stmt: &Select) -> Result<LogicalPlan, PlannerError> {
338 let table = self
340 .name_resolver
341 .resolve_table(&stmt.from.name, stmt.from.span)?;
342
343 let projection = self.build_projection(&stmt.projection, table)?;
345
346 let mut plan = LogicalPlan::Scan {
348 table: table.name.clone(),
349 projection,
350 };
351
352 if let Some(ref selection) = stmt.selection {
354 let predicate = self.type_checker.infer_type(selection, table)?;
355
356 if predicate.resolved_type != ResolvedType::Boolean {
358 return Err(PlannerError::type_mismatch(
359 "Boolean",
360 predicate.resolved_type.to_string(),
361 selection.span,
362 ));
363 }
364
365 plan = LogicalPlan::Filter {
366 input: Box::new(plan),
367 predicate,
368 };
369 }
370
371 if !stmt.order_by.is_empty() {
373 let order_by = self.build_sort_exprs(&stmt.order_by, table)?;
374 plan = LogicalPlan::Sort {
375 input: Box::new(plan),
376 order_by,
377 };
378 }
379
380 if stmt.limit.is_some() || stmt.offset.is_some() {
382 let limit = self.extract_limit_value(&stmt.limit, stmt.span)?;
383 let offset = self.extract_limit_value(&stmt.offset, stmt.span)?;
384 plan = LogicalPlan::Limit {
385 input: Box::new(plan),
386 limit,
387 offset,
388 };
389 }
390
391 Ok(plan)
392 }
393
394 fn build_projection(
398 &self,
399 items: &[SelectItem],
400 table: &TableMetadata,
401 ) -> Result<Projection, PlannerError> {
402 if items.len() == 1 && matches!(&items[0], SelectItem::Wildcard { .. }) {
404 let columns = self.name_resolver.expand_wildcard(table);
405 return Ok(Projection::All(columns));
406 }
407
408 let mut projected_columns = Vec::new();
410 for item in items {
411 match item {
412 SelectItem::Wildcard { span } => {
413 for col in &table.columns {
415 let column_index = table.get_column_index(&col.name).unwrap();
416 let typed_expr = TypedExpr::column_ref(
417 table.name.clone(),
418 col.name.clone(),
419 column_index,
420 col.data_type.clone(),
421 *span,
422 );
423 projected_columns.push(ProjectedColumn::new(typed_expr));
424 }
425 }
426 SelectItem::Expr { expr, alias, .. } => {
427 let typed_expr = self.type_checker.infer_type(expr, table)?;
428 let projected = if let Some(alias) = alias {
429 ProjectedColumn::with_alias(typed_expr, alias.clone())
430 } else {
431 ProjectedColumn::new(typed_expr)
432 };
433 projected_columns.push(projected);
434 }
435 }
436 }
437
438 Ok(Projection::Columns(projected_columns))
439 }
440
441 fn build_sort_exprs(
443 &self,
444 order_by: &[OrderByExpr],
445 table: &TableMetadata,
446 ) -> Result<Vec<SortExpr>, PlannerError> {
447 let mut sort_exprs = Vec::new();
448
449 for order_expr in order_by {
450 let typed_expr = self.type_checker.infer_type(&order_expr.expr, table)?;
451
452 let asc = order_expr.asc.unwrap_or(true);
454
455 let nulls_first = order_expr.nulls_first.unwrap_or(false);
457
458 sort_exprs.push(SortExpr::new(typed_expr, asc, nulls_first));
459 }
460
461 Ok(sort_exprs)
462 }
463
464 fn extract_limit_value(
468 &self,
469 expr: &Option<crate::ast::expr::Expr>,
470 stmt_span: crate::ast::Span,
471 ) -> Result<Option<u64>, PlannerError> {
472 match expr {
473 None => Ok(None),
474 Some(e) => {
475 if let crate::ast::expr::ExprKind::Literal(Literal::Number(s)) = &e.kind {
477 s.parse::<u64>().map(Some).map_err(|_| {
478 PlannerError::type_mismatch("unsigned integer", s.clone(), e.span)
479 })
480 } else {
481 Err(PlannerError::unsupported_feature(
482 "non-literal LIMIT/OFFSET",
483 "v0.3.0+",
484 stmt_span,
485 ))
486 }
487 }
488 }
489 }
490
491 fn plan_insert(&self, stmt: &Insert) -> Result<LogicalPlan, PlannerError> {
496 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
498
499 let columns: Vec<String> = if let Some(ref cols) = stmt.columns {
501 for col in cols {
503 self.name_resolver.resolve_column(table, col, stmt.span)?;
504 }
505 cols.clone()
506 } else {
507 table.column_names().into_iter().map(String::from).collect()
509 };
510
511 let mut typed_values: Vec<Vec<TypedExpr>> = Vec::new();
513
514 for row in &stmt.values {
515 if row.len() != columns.len() {
517 return Err(PlannerError::column_value_count_mismatch(
518 columns.len(),
519 row.len(),
520 stmt.span,
521 ));
522 }
523
524 let typed_row = self.type_check_insert_values(row, &columns, table)?;
526 typed_values.push(typed_row);
527 }
528
529 Ok(LogicalPlan::Insert {
530 table: table.name.clone(),
531 columns,
532 values: typed_values,
533 })
534 }
535
536 fn type_check_insert_values(
538 &self,
539 values: &[crate::ast::expr::Expr],
540 columns: &[String],
541 table: &TableMetadata,
542 ) -> Result<Vec<TypedExpr>, PlannerError> {
543 let mut typed_values = Vec::new();
544
545 for (i, value) in values.iter().enumerate() {
546 let column_name = &columns[i];
547 let column_meta = table.get_column(column_name).ok_or_else(|| {
548 PlannerError::column_not_found(column_name, &table.name, value.span)
549 })?;
550
551 let typed_value = self.type_checker.infer_type(value, table)?;
553
554 if column_meta.not_null
556 && matches!(&typed_value.kind, TypedExprKind::Literal(Literal::Null))
557 {
558 return Err(PlannerError::null_constraint_violation(
559 column_name,
560 value.span,
561 ));
562 }
563
564 self.validate_type_assignment(&typed_value, &column_meta.data_type, value.span)?;
566
567 typed_values.push(typed_value);
568 }
569
570 Ok(typed_values)
571 }
572
573 fn validate_type_assignment(
575 &self,
576 value: &TypedExpr,
577 target_type: &ResolvedType,
578 span: crate::ast::Span,
579 ) -> Result<(), PlannerError> {
580 if value.resolved_type == ResolvedType::Null {
582 return Ok(());
583 }
584
585 if self.types_compatible(&value.resolved_type, target_type) {
587 return Ok(());
588 }
589
590 Err(PlannerError::type_mismatch(
591 target_type.to_string(),
592 value.resolved_type.to_string(),
593 span,
594 ))
595 }
596
597 fn types_compatible(&self, source: &ResolvedType, target: &ResolvedType) -> bool {
599 use ResolvedType::*;
600
601 if source == target {
603 return true;
604 }
605
606 match (source, target) {
608 (Integer, BigInt) | (Integer, Float) | (Integer, Double) => true,
610 (BigInt, Float) | (BigInt, Double) => true,
612 (Float, Double) => true,
614 (Vector { dimension: d1, .. }, Vector { dimension: d2, .. }) => d1 == d2,
616 _ => false,
617 }
618 }
619
620 fn plan_update(&self, stmt: &Update) -> Result<LogicalPlan, PlannerError> {
624 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
626
627 let mut typed_assignments = Vec::new();
629
630 for assignment in &stmt.assignments {
631 let column_meta =
633 self.name_resolver
634 .resolve_column(table, &assignment.column, assignment.span)?;
635 let column_index = table.get_column_index(&assignment.column).unwrap();
636
637 let typed_value = self.type_checker.infer_type(&assignment.value, table)?;
639
640 if column_meta.not_null
642 && matches!(&typed_value.kind, TypedExprKind::Literal(Literal::Null))
643 {
644 return Err(PlannerError::null_constraint_violation(
645 &assignment.column,
646 assignment.value.span,
647 ));
648 }
649
650 self.validate_type_assignment(
652 &typed_value,
653 &column_meta.data_type,
654 assignment.value.span,
655 )?;
656
657 typed_assignments.push(TypedAssignment::new(
658 assignment.column.clone(),
659 column_index,
660 typed_value,
661 ));
662 }
663
664 let filter = if let Some(ref selection) = stmt.selection {
666 let predicate = self.type_checker.infer_type(selection, table)?;
667
668 if predicate.resolved_type != ResolvedType::Boolean {
670 return Err(PlannerError::type_mismatch(
671 "Boolean",
672 predicate.resolved_type.to_string(),
673 selection.span,
674 ));
675 }
676
677 Some(predicate)
678 } else {
679 None
680 };
681
682 Ok(LogicalPlan::Update {
683 table: table.name.clone(),
684 assignments: typed_assignments,
685 filter,
686 })
687 }
688
689 fn plan_delete(&self, stmt: &Delete) -> Result<LogicalPlan, PlannerError> {
693 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
695
696 let filter = if let Some(ref selection) = stmt.selection {
698 let predicate = self.type_checker.infer_type(selection, table)?;
699
700 if predicate.resolved_type != ResolvedType::Boolean {
702 return Err(PlannerError::type_mismatch(
703 "Boolean",
704 predicate.resolved_type.to_string(),
705 selection.span,
706 ));
707 }
708
709 Some(predicate)
710 } else {
711 None
712 };
713
714 Ok(LogicalPlan::Delete {
715 table: table.name.clone(),
716 filter,
717 })
718 }
719}