1pub mod aggregate_expr;
13mod error;
14pub mod knn_optimizer;
15pub mod logical_plan;
16pub mod name_resolver;
17pub mod type_checker;
18pub mod typed_expr;
19pub mod types;
20
21#[cfg(test)]
22mod planner_tests;
23
24pub use aggregate_expr::{AggregateExpr, AggregateFunction};
25pub use error::PlannerError;
26pub use knn_optimizer::{KnnPattern, SortDirection, detect_knn_pattern};
27pub use logical_plan::LogicalPlan;
28pub use name_resolver::{NameResolver, ResolvedColumn};
29pub use type_checker::TypeChecker;
30pub use typed_expr::{
31 ProjectedColumn, Projection, SortExpr, TypedAssignment, TypedExpr, TypedExprKind,
32};
33pub use types::ResolvedType;
34
35use crate::ast::ddl::{
36 ColumnConstraint, ColumnDef, CreateIndex, CreateTable, DropIndex, DropTable,
37};
38use crate::ast::dml::{Delete, Insert, LITERAL_TABLE, OrderByExpr, Select, SelectItem, Update};
39use crate::ast::expr::Literal;
40use crate::ast::{Statement, StatementKind};
41use crate::catalog::{Catalog, ColumnMetadata, IndexMetadata, TableMetadata};
42use crate::{DataSourceFormat, TableType};
43use std::collections::HashMap;
44
45pub struct Planner<'a, C: Catalog + ?Sized> {
72 catalog: &'a C,
73 name_resolver: NameResolver<'a, C>,
74 type_checker: TypeChecker<'a, C>,
75}
76
77impl<'a, C: Catalog + ?Sized> Planner<'a, C> {
78 pub fn new(catalog: &'a C) -> Self {
80 Self {
81 catalog,
82 name_resolver: NameResolver::new(catalog),
83 type_checker: TypeChecker::new(catalog),
84 }
85 }
86
87 pub fn plan(&self, stmt: &Statement) -> Result<LogicalPlan, PlannerError> {
98 match &stmt.kind {
99 StatementKind::CreateTable(ct) => self.plan_create_table(ct),
101 StatementKind::DropTable(dt) => self.plan_drop_table(dt),
102 StatementKind::CreateIndex(ci) => self.plan_create_index(ci),
103 StatementKind::DropIndex(di) => self.plan_drop_index(di),
104
105 StatementKind::Select(sel) => self.plan_select(sel),
107 StatementKind::Insert(ins) => self.plan_insert(ins),
108 StatementKind::Update(upd) => self.plan_update(upd),
109 StatementKind::Delete(del) => self.plan_delete(del),
110 }
111 }
112
113 fn plan_create_table(&self, stmt: &CreateTable) -> Result<LogicalPlan, PlannerError> {
122 if !stmt.if_not_exists && self.catalog.table_exists(&stmt.name) {
124 return Err(PlannerError::table_already_exists(&stmt.name));
125 }
126
127 let columns: Vec<ColumnMetadata> = stmt
129 .columns
130 .iter()
131 .map(|col| self.convert_column_def(col))
132 .collect();
133
134 let primary_key = Self::extract_primary_key(stmt);
136
137 let mut table = TableMetadata::new(stmt.name.clone(), columns);
140 if let Some(pk) = primary_key {
141 table = table.with_primary_key(pk);
142 }
143 table.catalog_name = "default".to_string();
144 table.namespace_name = "default".to_string();
145 table.table_type = TableType::Managed;
146 table.data_source_format = DataSourceFormat::Alopex;
147 table.properties = HashMap::new();
148
149 Ok(LogicalPlan::CreateTable {
150 table,
151 if_not_exists: stmt.if_not_exists,
152 with_options: stmt.with_options.clone(),
153 })
154 }
155
156 fn convert_column_def(&self, col: &ColumnDef) -> ColumnMetadata {
158 let data_type = ResolvedType::from_ast(&col.data_type);
159 let mut meta = ColumnMetadata::new(col.name.clone(), data_type);
160
161 for constraint in &col.constraints {
163 meta = Self::apply_column_constraint(meta, constraint);
164 }
165
166 meta
167 }
168
169 fn apply_column_constraint(
171 mut meta: ColumnMetadata,
172 constraint: &ColumnConstraint,
173 ) -> ColumnMetadata {
174 match constraint {
175 ColumnConstraint::NotNull => {
176 meta.not_null = true;
177 }
178 ColumnConstraint::Null => {
179 meta.not_null = false;
180 }
181 ColumnConstraint::PrimaryKey => {
182 meta.primary_key = true;
183 meta.not_null = true; }
185 ColumnConstraint::Unique => {
186 meta.unique = true;
187 }
188 ColumnConstraint::Default(expr) => {
189 meta.default = Some(expr.clone());
190 }
191 ColumnConstraint::WithSpan { kind, .. } => {
192 meta = Self::apply_column_constraint(meta, kind);
193 }
194 }
195 meta
196 }
197
198 fn extract_primary_key(stmt: &CreateTable) -> Option<Vec<String>> {
200 use crate::ast::ddl::TableConstraint;
201
202 if let Some(TableConstraint::PrimaryKey { columns, .. }) = stmt.constraints.first() {
206 return Some(columns.clone());
207 }
208
209 let pk_columns: Vec<String> = stmt
211 .columns
212 .iter()
213 .filter(|col| col.constraints.iter().any(Self::is_primary_key_constraint))
214 .map(|col| col.name.clone())
215 .collect();
216
217 if pk_columns.is_empty() {
218 None
219 } else {
220 Some(pk_columns)
221 }
222 }
223
224 fn is_primary_key_constraint(constraint: &ColumnConstraint) -> bool {
226 match constraint {
227 ColumnConstraint::PrimaryKey => true,
228 ColumnConstraint::WithSpan { kind, .. } => Self::is_primary_key_constraint(kind),
229 _ => false,
230 }
231 }
232
233 fn plan_drop_table(&self, stmt: &DropTable) -> Result<LogicalPlan, PlannerError> {
237 if !stmt.if_exists && !self.table_exists_in_default(&stmt.name) {
239 return Err(PlannerError::TableNotFound {
240 name: stmt.name.clone(),
241 line: stmt.span.start.line,
242 column: stmt.span.start.column,
243 });
244 }
245
246 Ok(LogicalPlan::DropTable {
247 name: stmt.name.clone(),
248 if_exists: stmt.if_exists,
249 })
250 }
251
252 fn table_exists_in_default(&self, name: &str) -> bool {
253 match self.catalog.get_table(name) {
254 Some(table) => table.catalog_name == "default" && table.namespace_name == "default",
255 None => false,
256 }
257 }
258
259 fn plan_create_index(&self, stmt: &CreateIndex) -> Result<LogicalPlan, PlannerError> {
266 if !stmt.if_not_exists && self.catalog.index_exists(&stmt.name) {
268 return Err(PlannerError::index_already_exists(&stmt.name));
269 }
270
271 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
273
274 self.name_resolver
276 .resolve_column(table, &stmt.column, stmt.span)?;
277
278 let mut index = IndexMetadata::new(
282 0,
283 stmt.name.clone(),
284 stmt.table.clone(),
285 vec![stmt.column.clone()],
286 );
287
288 if let Some(method) = stmt.method {
289 index = index.with_method(method);
290 }
291
292 let options: Vec<(String, String)> = stmt
293 .options
294 .iter()
295 .map(|opt| (opt.key.clone(), opt.value.clone()))
296 .collect();
297 if !options.is_empty() {
298 index = index.with_options(options);
299 }
300
301 Ok(LogicalPlan::CreateIndex {
302 index,
303 if_not_exists: stmt.if_not_exists,
304 })
305 }
306
307 fn plan_drop_index(&self, stmt: &DropIndex) -> Result<LogicalPlan, PlannerError> {
311 if !stmt.if_exists && !self.index_exists_in_default(&stmt.name) {
313 return Err(PlannerError::index_not_found(&stmt.name));
314 }
315
316 Ok(LogicalPlan::DropIndex {
317 name: stmt.name.clone(),
318 if_exists: stmt.if_exists,
319 })
320 }
321
322 fn index_exists_in_default(&self, name: &str) -> bool {
323 match self.catalog.get_index(name) {
324 Some(index) => index.catalog_name == "default" && index.namespace_name == "default",
325 None => false,
326 }
327 }
328
329 fn plan_select(&self, stmt: &Select) -> Result<LogicalPlan, PlannerError> {
338 let literal_table;
340 let table = if stmt.from.name == LITERAL_TABLE {
341 literal_table = TableMetadata::new(LITERAL_TABLE, Vec::new());
342 &literal_table
343 } else {
344 self.name_resolver
345 .resolve_table(&stmt.from.name, stmt.from.span)?
346 };
347
348 let has_group_by = stmt
349 .group_by
350 .as_ref()
351 .is_some_and(|items| !items.is_empty());
352 let has_aggregate = self.select_contains_aggregate(stmt);
353 let distinct_only =
354 stmt.distinct && !has_group_by && !has_aggregate && stmt.having.is_none();
355
356 let scan_projection =
357 if has_group_by || has_aggregate || stmt.having.is_some() || stmt.distinct {
358 Projection::All(table.column_names().into_iter().map(String::from).collect())
359 } else {
360 self.build_projection(&stmt.projection, table)?
361 };
362
363 let mut plan = LogicalPlan::Scan {
365 table: stmt.from.name.clone(),
366 projection: scan_projection,
367 };
368
369 if let Some(ref selection) = stmt.selection {
371 let predicate = self.type_checker.infer_type(selection, table)?;
372
373 if predicate.resolved_type != ResolvedType::Boolean {
375 return Err(PlannerError::type_mismatch(
376 "Boolean",
377 predicate.resolved_type.to_string(),
378 selection.span,
379 ));
380 }
381
382 plan = LogicalPlan::Filter {
383 input: Box::new(plan),
384 predicate,
385 };
386 }
387
388 if has_group_by || has_aggregate || stmt.having.is_some() || stmt.distinct {
389 if !has_group_by && !has_aggregate && stmt.having.is_some() {
390 return Err(PlannerError::invalid_expression(
391 "HAVING requires GROUP BY or aggregate functions".to_string(),
392 ));
393 }
394
395 let (group_keys, projected) = if distinct_only {
396 let projected =
397 self.build_projected_columns_for_distinct(&stmt.projection, table)?;
398 let group_keys = projected.iter().map(|col| col.expr.clone()).collect();
399 (group_keys, projected)
400 } else {
401 let group_keys = self.build_group_keys(stmt, table)?;
402 let projected =
403 self.build_projected_columns_for_aggregate(&stmt.projection, table)?;
404 (group_keys, projected)
405 };
406 let mut aggregates = Vec::new();
407 let mut agg_map = HashMap::new();
408
409 for col in &projected {
410 self.collect_aggregates_from_typed_expr(&col.expr, &mut aggregates, &mut agg_map)?;
411 }
412
413 let having_typed = if let Some(having) = &stmt.having {
414 let typed = self.type_checker.infer_type(having, table)?;
415 if typed.resolved_type != ResolvedType::Boolean {
416 return Err(PlannerError::type_mismatch(
417 "Boolean",
418 typed.resolved_type.type_name().to_string(),
419 typed.span,
420 ));
421 }
422 self.collect_aggregates_from_typed_expr(&typed, &mut aggregates, &mut agg_map)?;
423 Some(typed)
424 } else {
425 None
426 };
427
428 let mut order_by = Vec::new();
429 if !stmt.order_by.is_empty() {
430 for order_expr in &stmt.order_by {
431 let typed = self.type_checker.infer_type(&order_expr.expr, table)?;
432 self.collect_aggregates_from_typed_expr(&typed, &mut aggregates, &mut agg_map)?;
433 let asc = order_expr.asc.unwrap_or(true);
434 let nulls_first = order_expr.nulls_first.unwrap_or(false);
435 order_by.push(SortExpr::new(typed, asc, nulls_first));
436 }
437 }
438
439 if let Some(ref having) = having_typed {
440 self.type_checker
441 .validate_having_expr(having, &group_keys, &aggregates)?;
442 }
443
444 let output_schema = build_aggregate_schema(&group_keys, &aggregates);
445 let output_names: Vec<String> = output_schema.iter().map(|c| c.name.clone()).collect();
446
447 let projection = self.build_aggregate_projection(
448 projected,
449 &group_keys,
450 &aggregates,
451 &output_names,
452 )?;
453
454 let having = if let Some(having) = having_typed {
455 Some(self.rewrite_expr_for_aggregate(
456 &having,
457 &group_keys,
458 &aggregates,
459 &output_names,
460 )?)
461 } else {
462 None
463 };
464
465 let order_by = order_by
466 .into_iter()
467 .map(|expr| {
468 let rewritten = self.rewrite_expr_for_aggregate(
469 &expr.expr,
470 &group_keys,
471 &aggregates,
472 &output_names,
473 )?;
474 Ok(SortExpr::new(rewritten, expr.asc, expr.nulls_first))
475 })
476 .collect::<Result<Vec<_>, PlannerError>>()?;
477
478 plan = LogicalPlan::Aggregate {
479 input: Box::new(plan),
480 group_keys,
481 aggregates,
482 having,
483 projection,
484 };
485
486 if !order_by.is_empty() {
487 plan = LogicalPlan::Sort {
488 input: Box::new(plan),
489 order_by,
490 };
491 }
492
493 if stmt.limit.is_some() || stmt.offset.is_some() {
494 let limit = self.extract_limit_value(&stmt.limit, stmt.span)?;
495 let offset = self.extract_limit_value(&stmt.offset, stmt.span)?;
496 plan = LogicalPlan::Limit {
497 input: Box::new(plan),
498 limit,
499 offset,
500 };
501 }
502
503 return Ok(plan);
504 }
505
506 if !stmt.order_by.is_empty() {
508 let order_by = self.build_sort_exprs(&stmt.order_by, table)?;
509 plan = LogicalPlan::Sort {
510 input: Box::new(plan),
511 order_by,
512 };
513 }
514
515 if stmt.limit.is_some() || stmt.offset.is_some() {
516 let limit = self.extract_limit_value(&stmt.limit, stmt.span)?;
517 let offset = self.extract_limit_value(&stmt.offset, stmt.span)?;
518 plan = LogicalPlan::Limit {
519 input: Box::new(plan),
520 limit,
521 offset,
522 };
523 }
524
525 Ok(plan)
526 }
527
528 fn build_projection(
532 &self,
533 items: &[SelectItem],
534 table: &TableMetadata,
535 ) -> Result<Projection, PlannerError> {
536 if items.len() == 1 && matches!(&items[0], SelectItem::Wildcard { .. }) {
538 let columns = self.name_resolver.expand_wildcard(table);
539 return Ok(Projection::All(columns));
540 }
541
542 let mut projected_columns = Vec::new();
544 for item in items {
545 match item {
546 SelectItem::Wildcard { span } => {
547 for col in &table.columns {
549 let column_index = table.get_column_index(&col.name).unwrap();
550 let typed_expr = TypedExpr::column_ref(
551 table.name.clone(),
552 col.name.clone(),
553 column_index,
554 col.data_type.clone(),
555 *span,
556 );
557 projected_columns.push(ProjectedColumn::new(typed_expr));
558 }
559 }
560 SelectItem::Expr { expr, alias, .. } => {
561 let typed_expr = self.type_checker.infer_type(expr, table)?;
562 let projected = if let Some(alias) = alias {
563 ProjectedColumn::with_alias(typed_expr, alias.clone())
564 } else {
565 ProjectedColumn::new(typed_expr)
566 };
567 projected_columns.push(projected);
568 }
569 }
570 }
571
572 Ok(Projection::Columns(projected_columns))
573 }
574
575 fn build_sort_exprs(
577 &self,
578 order_by: &[OrderByExpr],
579 table: &TableMetadata,
580 ) -> Result<Vec<SortExpr>, PlannerError> {
581 let mut sort_exprs = Vec::new();
582
583 for order_expr in order_by {
584 let typed_expr = self.type_checker.infer_type(&order_expr.expr, table)?;
585
586 let asc = order_expr.asc.unwrap_or(true);
588
589 let nulls_first = order_expr.nulls_first.unwrap_or(false);
591
592 sort_exprs.push(SortExpr::new(typed_expr, asc, nulls_first));
593 }
594
595 Ok(sort_exprs)
596 }
597
598 fn select_contains_aggregate(&self, stmt: &Select) -> bool {
599 stmt.projection.iter().any(|item| match item {
600 SelectItem::Wildcard { .. } => false,
601 SelectItem::Expr { expr, .. } => expr_contains_aggregate(expr),
602 }) || stmt
603 .group_by
604 .as_ref()
605 .map(|items| items.iter().any(expr_contains_aggregate))
606 .unwrap_or(false)
607 || stmt
608 .having
609 .as_ref()
610 .map(expr_contains_aggregate)
611 .unwrap_or(false)
612 || stmt
613 .order_by
614 .iter()
615 .any(|order| expr_contains_aggregate(&order.expr))
616 }
617
618 fn build_group_keys(
619 &self,
620 stmt: &Select,
621 table: &TableMetadata,
622 ) -> Result<Vec<TypedExpr>, PlannerError> {
623 let mut keys = Vec::new();
624 if let Some(items) = &stmt.group_by {
625 for expr in items {
626 let typed = self.type_checker.infer_type(expr, table)?;
627 if typed_expr_contains_aggregate(&typed) {
628 return Err(PlannerError::invalid_expression(
629 "GROUP BY cannot contain aggregate functions".to_string(),
630 ));
631 }
632 if !matches!(typed.kind, TypedExprKind::ColumnRef { .. }) {
633 return Err(PlannerError::invalid_expression(
634 "GROUP BY expressions must be column references".to_string(),
635 ));
636 }
637 keys.push(typed);
638 }
639 }
640 Ok(keys)
641 }
642
643 fn build_projected_columns_for_aggregate(
644 &self,
645 items: &[SelectItem],
646 table: &TableMetadata,
647 ) -> Result<Vec<ProjectedColumn>, PlannerError> {
648 let mut projected = Vec::new();
649 for item in items {
650 match item {
651 SelectItem::Wildcard { .. } => {
652 return Err(PlannerError::invalid_expression(
653 "wildcard projection not supported with GROUP BY/aggregate".to_string(),
654 ));
655 }
656 SelectItem::Expr { expr, alias, .. } => {
657 let typed = self.type_checker.infer_type(expr, table)?;
658 projected.push(ProjectedColumn {
659 expr: typed,
660 alias: alias.clone(),
661 });
662 }
663 }
664 }
665 Ok(projected)
666 }
667
668 fn build_projected_columns_for_distinct(
669 &self,
670 items: &[SelectItem],
671 table: &TableMetadata,
672 ) -> Result<Vec<ProjectedColumn>, PlannerError> {
673 let projection = self.build_projection(items, table)?;
674 match projection {
675 Projection::All(columns) => {
676 let mut projected = Vec::with_capacity(columns.len());
677 for column in columns {
678 let column_index = table.get_column_index(&column).ok_or_else(|| {
679 PlannerError::invalid_expression(format!(
680 "column '{column}' not found for DISTINCT projection"
681 ))
682 })?;
683 let column_meta = table.get_column(&column).ok_or_else(|| {
684 PlannerError::invalid_expression(format!(
685 "column '{column}' not found for DISTINCT projection"
686 ))
687 })?;
688 let typed_expr = TypedExpr::column_ref(
689 table.name.clone(),
690 column.clone(),
691 column_index,
692 column_meta.data_type.clone(),
693 crate::ast::Span::default(),
694 );
695 projected.push(ProjectedColumn::new(typed_expr));
696 }
697 Ok(projected)
698 }
699 Projection::Columns(columns) => Ok(columns),
700 }
701 }
702
703 fn collect_aggregates_from_typed_expr(
704 &self,
705 expr: &TypedExpr,
706 aggregates: &mut Vec<AggregateExpr>,
707 aggregate_map: &mut HashMap<AggregateSignature, usize>,
708 ) -> Result<(), PlannerError> {
709 match &expr.kind {
710 TypedExprKind::FunctionCall {
711 name,
712 args,
713 distinct,
714 star,
715 } if is_aggregate_function(name) => {
716 for arg in args {
717 if typed_expr_contains_aggregate(arg) {
718 return Err(PlannerError::invalid_expression(
719 "nested aggregate functions are not supported".to_string(),
720 ));
721 }
722 }
723 let (agg, signature) =
724 self.build_aggregate_expr_from_typed(expr, name, args, *distinct, *star)?;
725 aggregate_map.entry(signature).or_insert_with(|| {
726 aggregates.push(agg);
727 aggregates.len() - 1
728 });
729 Ok(())
730 }
731 TypedExprKind::BinaryOp { left, right, .. } => {
732 self.collect_aggregates_from_typed_expr(left, aggregates, aggregate_map)?;
733 self.collect_aggregates_from_typed_expr(right, aggregates, aggregate_map)?;
734 Ok(())
735 }
736 TypedExprKind::UnaryOp { operand, .. } => {
737 self.collect_aggregates_from_typed_expr(operand, aggregates, aggregate_map)
738 }
739 TypedExprKind::FunctionCall { args, .. } => {
740 for arg in args {
741 self.collect_aggregates_from_typed_expr(arg, aggregates, aggregate_map)?;
742 }
743 Ok(())
744 }
745 TypedExprKind::Between {
746 expr, low, high, ..
747 } => {
748 self.collect_aggregates_from_typed_expr(expr, aggregates, aggregate_map)?;
749 self.collect_aggregates_from_typed_expr(low, aggregates, aggregate_map)?;
750 self.collect_aggregates_from_typed_expr(high, aggregates, aggregate_map)?;
751 Ok(())
752 }
753 TypedExprKind::Like {
754 expr,
755 pattern,
756 escape,
757 ..
758 } => {
759 self.collect_aggregates_from_typed_expr(expr, aggregates, aggregate_map)?;
760 self.collect_aggregates_from_typed_expr(pattern, aggregates, aggregate_map)?;
761 if let Some(esc) = escape {
762 self.collect_aggregates_from_typed_expr(esc, aggregates, aggregate_map)?;
763 }
764 Ok(())
765 }
766 TypedExprKind::InList { expr, list, .. } => {
767 self.collect_aggregates_from_typed_expr(expr, aggregates, aggregate_map)?;
768 for item in list {
769 self.collect_aggregates_from_typed_expr(item, aggregates, aggregate_map)?;
770 }
771 Ok(())
772 }
773 TypedExprKind::IsNull { expr, .. } => {
774 self.collect_aggregates_from_typed_expr(expr, aggregates, aggregate_map)
775 }
776 _ => Ok(()),
777 }
778 }
779
780 fn build_aggregate_expr_from_typed(
781 &self,
782 expr: &TypedExpr,
783 name: &str,
784 args: &[TypedExpr],
785 distinct: bool,
786 star: bool,
787 ) -> Result<(AggregateExpr, AggregateSignature), PlannerError> {
788 let lower = name.to_lowercase();
789 match lower.as_str() {
790 "count" => {
791 if star {
792 let agg = AggregateExpr::count_star();
793 let signature = aggregate_signature(name, distinct, star, None, None, expr);
794 return Ok((agg, signature));
795 }
796 if args.len() != 1 {
797 return Err(PlannerError::type_mismatch(
798 "1 argument",
799 format!("{} arguments", args.len()),
800 expr.span,
801 ));
802 }
803 let agg = AggregateExpr {
804 function: AggregateFunction::Count,
805 arg: Some(args[0].clone()),
806 distinct,
807 result_type: ResolvedType::BigInt,
808 };
809 let signature =
810 aggregate_signature(name, distinct, star, Some(&args[0]), None, expr);
811 Ok((agg, signature))
812 }
813 "sum" => {
814 let arg = self.require_single_aggregate_arg(args, expr.span)?;
815 let agg = AggregateExpr {
816 function: AggregateFunction::Sum,
817 arg: Some(arg.clone()),
818 distinct: false,
819 result_type: ResolvedType::Double,
820 };
821 let signature = aggregate_signature(name, false, star, Some(arg), None, expr);
822 Ok((agg, signature))
823 }
824 "total" => {
825 let arg = self.require_single_aggregate_arg(args, expr.span)?;
826 let agg = AggregateExpr {
827 function: AggregateFunction::Total,
828 arg: Some(arg.clone()),
829 distinct: false,
830 result_type: ResolvedType::Double,
831 };
832 let signature = aggregate_signature(name, false, star, Some(arg), None, expr);
833 Ok((agg, signature))
834 }
835 "avg" => {
836 let arg = self.require_single_aggregate_arg(args, expr.span)?;
837 let agg = AggregateExpr {
838 function: AggregateFunction::Avg,
839 arg: Some(arg.clone()),
840 distinct: false,
841 result_type: ResolvedType::Double,
842 };
843 let signature = aggregate_signature(name, false, star, Some(arg), None, expr);
844 Ok((agg, signature))
845 }
846 "min" => {
847 let arg = self.require_single_aggregate_arg(args, expr.span)?;
848 let agg = AggregateExpr {
849 function: AggregateFunction::Min,
850 arg: Some(arg.clone()),
851 distinct: false,
852 result_type: arg.resolved_type.clone(),
853 };
854 let signature = aggregate_signature(name, false, star, Some(arg), None, expr);
855 Ok((agg, signature))
856 }
857 "max" => {
858 let arg = self.require_single_aggregate_arg(args, expr.span)?;
859 let agg = AggregateExpr {
860 function: AggregateFunction::Max,
861 arg: Some(arg.clone()),
862 distinct: false,
863 result_type: arg.resolved_type.clone(),
864 };
865 let signature = aggregate_signature(name, false, star, Some(arg), None, expr);
866 Ok((agg, signature))
867 }
868 "group_concat" => {
869 if args.is_empty() || args.len() > 2 {
870 return Err(PlannerError::type_mismatch(
871 "1 or 2 arguments",
872 format!("{} arguments", args.len()),
873 expr.span,
874 ));
875 }
876 let arg = &args[0];
877 let mut separator = None;
878 if args.len() == 2 {
879 if let TypedExprKind::Literal(Literal::String(value)) = &args[1].kind {
880 separator = Some(value.clone());
881 } else {
882 return Err(PlannerError::invalid_expression(
883 "GROUP_CONCAT separator must be a string literal".to_string(),
884 ));
885 }
886 }
887 let agg = AggregateExpr {
888 function: AggregateFunction::GroupConcat { separator },
889 arg: Some(arg.clone()),
890 distinct: false,
891 result_type: ResolvedType::Text,
892 };
893 let signature = aggregate_signature(
894 name,
895 false,
896 star,
897 Some(arg),
898 match &agg.function {
899 AggregateFunction::GroupConcat { separator } => separator.as_ref(),
900 _ => None,
901 },
902 expr,
903 );
904 Ok((agg, signature))
905 }
906 "string_agg" => {
907 if args.len() != 2 {
908 return Err(PlannerError::type_mismatch(
909 "2 arguments",
910 format!("{} arguments", args.len()),
911 expr.span,
912 ));
913 }
914 let arg = &args[0];
915 let separator =
916 if let TypedExprKind::Literal(Literal::String(value)) = &args[1].kind {
917 Some(value.clone())
918 } else {
919 return Err(PlannerError::invalid_expression(
920 "STRING_AGG separator must be a string literal".to_string(),
921 ));
922 };
923 let agg = AggregateExpr {
924 function: AggregateFunction::StringAgg { separator },
925 arg: Some(arg.clone()),
926 distinct: false,
927 result_type: ResolvedType::Text,
928 };
929 let signature = aggregate_signature(
930 name,
931 false,
932 star,
933 Some(arg),
934 match &agg.function {
935 AggregateFunction::StringAgg { separator } => separator.as_ref(),
936 _ => None,
937 },
938 expr,
939 );
940 Ok((agg, signature))
941 }
942 _ => Err(PlannerError::unsupported_feature(
943 format!("function '{}'", name),
944 "future",
945 expr.span,
946 )),
947 }
948 }
949
950 fn require_single_aggregate_arg<'b>(
951 &self,
952 args: &'b [TypedExpr],
953 span: crate::ast::Span,
954 ) -> Result<&'b TypedExpr, PlannerError> {
955 if args.len() != 1 {
956 return Err(PlannerError::type_mismatch(
957 "1 argument",
958 format!("{} arguments", args.len()),
959 span,
960 ));
961 }
962 Ok(&args[0])
963 }
964
965 fn build_aggregate_projection(
966 &self,
967 projected: Vec<ProjectedColumn>,
968 group_keys: &[TypedExpr],
969 aggregates: &[AggregateExpr],
970 output_names: &[String],
971 ) -> Result<Projection, PlannerError> {
972 let mut columns = Vec::new();
973 for col in projected {
974 let rewritten =
975 self.rewrite_expr_for_aggregate(&col.expr, group_keys, aggregates, output_names)?;
976 columns.push(ProjectedColumn {
977 expr: rewritten,
978 alias: col.alias,
979 });
980 }
981 Ok(Projection::Columns(columns))
982 }
983
984 fn rewrite_expr_for_aggregate(
985 &self,
986 expr: &TypedExpr,
987 group_keys: &[TypedExpr],
988 aggregates: &[AggregateExpr],
989 output_names: &[String],
990 ) -> Result<TypedExpr, PlannerError> {
991 let group_key_map = build_group_key_map(group_keys);
992 let aggregate_map = build_aggregate_map(aggregates);
993
994 rewrite_expr_with_maps(expr, &group_key_map, &aggregate_map, output_names)
995 }
996
997 fn extract_limit_value(
1001 &self,
1002 expr: &Option<crate::ast::expr::Expr>,
1003 stmt_span: crate::ast::Span,
1004 ) -> Result<Option<u64>, PlannerError> {
1005 match expr {
1006 None => Ok(None),
1007 Some(e) => {
1008 if let crate::ast::expr::ExprKind::Literal(Literal::Number(s)) = &e.kind {
1010 s.parse::<u64>().map(Some).map_err(|_| {
1011 PlannerError::type_mismatch("unsigned integer", s.clone(), e.span)
1012 })
1013 } else {
1014 Err(PlannerError::unsupported_feature(
1015 "non-literal LIMIT/OFFSET",
1016 "v0.3.0+",
1017 stmt_span,
1018 ))
1019 }
1020 }
1021 }
1022 }
1023
1024 fn plan_insert(&self, stmt: &Insert) -> Result<LogicalPlan, PlannerError> {
1029 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
1031
1032 let columns: Vec<String> = if let Some(ref cols) = stmt.columns {
1034 for col in cols {
1036 self.name_resolver.resolve_column(table, col, stmt.span)?;
1037 }
1038 cols.clone()
1039 } else {
1040 table.column_names().into_iter().map(String::from).collect()
1042 };
1043
1044 let mut typed_values: Vec<Vec<TypedExpr>> = Vec::new();
1046
1047 for row in &stmt.values {
1048 if row.len() != columns.len() {
1050 return Err(PlannerError::column_value_count_mismatch(
1051 columns.len(),
1052 row.len(),
1053 stmt.span,
1054 ));
1055 }
1056
1057 let typed_row = self.type_check_insert_values(row, &columns, table)?;
1059 typed_values.push(typed_row);
1060 }
1061
1062 Ok(LogicalPlan::Insert {
1063 table: table.name.clone(),
1064 columns,
1065 values: typed_values,
1066 })
1067 }
1068
1069 fn type_check_insert_values(
1071 &self,
1072 values: &[crate::ast::expr::Expr],
1073 columns: &[String],
1074 table: &TableMetadata,
1075 ) -> Result<Vec<TypedExpr>, PlannerError> {
1076 let mut typed_values = Vec::new();
1077
1078 for (i, value) in values.iter().enumerate() {
1079 let column_name = &columns[i];
1080 let column_meta = table.get_column(column_name).ok_or_else(|| {
1081 PlannerError::column_not_found(column_name, &table.name, value.span)
1082 })?;
1083
1084 let typed_value = self.type_checker.infer_type(value, table)?;
1086
1087 if column_meta.not_null
1089 && matches!(&typed_value.kind, TypedExprKind::Literal(Literal::Null))
1090 {
1091 return Err(PlannerError::null_constraint_violation(
1092 column_name,
1093 value.span,
1094 ));
1095 }
1096
1097 self.validate_type_assignment(&typed_value, &column_meta.data_type, value.span)?;
1099
1100 typed_values.push(typed_value);
1101 }
1102
1103 Ok(typed_values)
1104 }
1105
1106 fn validate_type_assignment(
1108 &self,
1109 value: &TypedExpr,
1110 target_type: &ResolvedType,
1111 span: crate::ast::Span,
1112 ) -> Result<(), PlannerError> {
1113 if value.resolved_type == ResolvedType::Null {
1115 return Ok(());
1116 }
1117
1118 if self.types_compatible(&value.resolved_type, target_type) {
1120 return Ok(());
1121 }
1122
1123 Err(PlannerError::type_mismatch(
1124 target_type.to_string(),
1125 value.resolved_type.to_string(),
1126 span,
1127 ))
1128 }
1129
1130 fn types_compatible(&self, source: &ResolvedType, target: &ResolvedType) -> bool {
1132 use ResolvedType::*;
1133
1134 if source == target {
1136 return true;
1137 }
1138
1139 match (source, target) {
1141 (Integer, BigInt) | (Integer, Float) | (Integer, Double) => true,
1143 (BigInt, Float) | (BigInt, Double) => true,
1145 (Float, Double) => true,
1147 (Vector { dimension: d1, .. }, Vector { dimension: d2, .. }) => d1 == d2,
1149 _ => false,
1150 }
1151 }
1152
1153 fn plan_update(&self, stmt: &Update) -> Result<LogicalPlan, PlannerError> {
1157 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
1159
1160 let mut typed_assignments = Vec::new();
1162
1163 for assignment in &stmt.assignments {
1164 let column_meta =
1166 self.name_resolver
1167 .resolve_column(table, &assignment.column, assignment.span)?;
1168 let column_index = table.get_column_index(&assignment.column).unwrap();
1169
1170 let typed_value = self.type_checker.infer_type(&assignment.value, table)?;
1172
1173 if column_meta.not_null
1175 && matches!(&typed_value.kind, TypedExprKind::Literal(Literal::Null))
1176 {
1177 return Err(PlannerError::null_constraint_violation(
1178 &assignment.column,
1179 assignment.value.span,
1180 ));
1181 }
1182
1183 self.validate_type_assignment(
1185 &typed_value,
1186 &column_meta.data_type,
1187 assignment.value.span,
1188 )?;
1189
1190 typed_assignments.push(TypedAssignment::new(
1191 assignment.column.clone(),
1192 column_index,
1193 typed_value,
1194 ));
1195 }
1196
1197 let filter = if let Some(ref selection) = stmt.selection {
1199 let predicate = self.type_checker.infer_type(selection, table)?;
1200
1201 if predicate.resolved_type != ResolvedType::Boolean {
1203 return Err(PlannerError::type_mismatch(
1204 "Boolean",
1205 predicate.resolved_type.to_string(),
1206 selection.span,
1207 ));
1208 }
1209
1210 Some(predicate)
1211 } else {
1212 None
1213 };
1214
1215 Ok(LogicalPlan::Update {
1216 table: table.name.clone(),
1217 assignments: typed_assignments,
1218 filter,
1219 })
1220 }
1221
1222 fn plan_delete(&self, stmt: &Delete) -> Result<LogicalPlan, PlannerError> {
1226 let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
1228
1229 let filter = if let Some(ref selection) = stmt.selection {
1231 let predicate = self.type_checker.infer_type(selection, table)?;
1232
1233 if predicate.resolved_type != ResolvedType::Boolean {
1235 return Err(PlannerError::type_mismatch(
1236 "Boolean",
1237 predicate.resolved_type.to_string(),
1238 selection.span,
1239 ));
1240 }
1241
1242 Some(predicate)
1243 } else {
1244 None
1245 };
1246
1247 Ok(LogicalPlan::Delete {
1248 table: table.name.clone(),
1249 filter,
1250 })
1251 }
1252}
1253
1254#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1255struct AggregateSignature {
1256 name: String,
1257 distinct: bool,
1258 star: bool,
1259 arg_key: Option<String>,
1260 separator: Option<String>,
1261}
1262
1263fn expr_contains_aggregate(expr: &crate::ast::expr::Expr) -> bool {
1264 use crate::ast::expr::ExprKind;
1265
1266 match &expr.kind {
1267 ExprKind::FunctionCall { name, args, .. } => {
1268 if is_aggregate_function(name) {
1269 return true;
1270 }
1271 args.iter().any(expr_contains_aggregate)
1272 }
1273 ExprKind::BinaryOp { left, right, .. } => {
1274 expr_contains_aggregate(left) || expr_contains_aggregate(right)
1275 }
1276 ExprKind::UnaryOp { operand, .. } => expr_contains_aggregate(operand),
1277 ExprKind::Between {
1278 expr, low, high, ..
1279 } => {
1280 expr_contains_aggregate(expr)
1281 || expr_contains_aggregate(low)
1282 || expr_contains_aggregate(high)
1283 }
1284 ExprKind::Like {
1285 expr,
1286 pattern,
1287 escape,
1288 ..
1289 } => {
1290 expr_contains_aggregate(expr)
1291 || expr_contains_aggregate(pattern)
1292 || escape.as_deref().is_some_and(expr_contains_aggregate)
1293 }
1294 ExprKind::InList { expr, list, .. } => {
1295 expr_contains_aggregate(expr) || list.iter().any(expr_contains_aggregate)
1296 }
1297 ExprKind::IsNull { expr, .. } => expr_contains_aggregate(expr),
1298 ExprKind::Literal(_) | ExprKind::VectorLiteral(_) | ExprKind::ColumnRef { .. } => false,
1299 }
1300}
1301
1302fn typed_expr_contains_aggregate(expr: &TypedExpr) -> bool {
1303 match &expr.kind {
1304 TypedExprKind::FunctionCall { name, args, .. } => {
1305 if is_aggregate_function(name) {
1306 return true;
1307 }
1308 args.iter().any(typed_expr_contains_aggregate)
1309 }
1310 TypedExprKind::BinaryOp { left, right, .. } => {
1311 typed_expr_contains_aggregate(left) || typed_expr_contains_aggregate(right)
1312 }
1313 TypedExprKind::UnaryOp { operand, .. } => typed_expr_contains_aggregate(operand),
1314 TypedExprKind::Between {
1315 expr, low, high, ..
1316 } => {
1317 typed_expr_contains_aggregate(expr)
1318 || typed_expr_contains_aggregate(low)
1319 || typed_expr_contains_aggregate(high)
1320 }
1321 TypedExprKind::Like {
1322 expr,
1323 pattern,
1324 escape,
1325 ..
1326 } => {
1327 typed_expr_contains_aggregate(expr)
1328 || typed_expr_contains_aggregate(pattern)
1329 || escape
1330 .as_ref()
1331 .is_some_and(|inner| typed_expr_contains_aggregate(inner))
1332 }
1333 TypedExprKind::InList { expr, list, .. } => {
1334 typed_expr_contains_aggregate(expr) || list.iter().any(typed_expr_contains_aggregate)
1335 }
1336 TypedExprKind::IsNull { expr, .. } => typed_expr_contains_aggregate(expr),
1337 _ => false,
1338 }
1339}
1340
1341fn is_aggregate_function(name: &str) -> bool {
1342 matches!(
1343 name.to_ascii_lowercase().as_str(),
1344 "count" | "sum" | "total" | "avg" | "min" | "max" | "group_concat" | "string_agg"
1345 )
1346}
1347
1348fn expr_key(expr: &TypedExpr) -> String {
1349 format!("{:?}", expr.kind)
1350}
1351
1352fn aggregate_signature(
1353 name: &str,
1354 distinct: bool,
1355 star: bool,
1356 arg: Option<&TypedExpr>,
1357 separator: Option<&String>,
1358 _expr: &TypedExpr,
1359) -> AggregateSignature {
1360 AggregateSignature {
1361 name: name.to_ascii_lowercase(),
1362 distinct,
1363 star,
1364 arg_key: arg.map(expr_key),
1365 separator: separator.cloned(),
1366 }
1367}
1368
1369fn build_group_key_map(group_keys: &[TypedExpr]) -> HashMap<String, usize> {
1370 let mut map = HashMap::new();
1371 for (idx, key) in group_keys.iter().enumerate() {
1372 map.insert(expr_key(key), idx);
1373 }
1374 map
1375}
1376
1377fn build_aggregate_map(aggregates: &[AggregateExpr]) -> HashMap<AggregateSignature, usize> {
1378 let mut map = HashMap::new();
1379 for (idx, agg) in aggregates.iter().enumerate() {
1380 let (name, separator, star, arg) = match &agg.function {
1381 AggregateFunction::Count => (
1382 "count".to_string(),
1383 None,
1384 agg.arg.is_none(),
1385 agg.arg.as_ref(),
1386 ),
1387 AggregateFunction::Sum => ("sum".to_string(), None, false, agg.arg.as_ref()),
1388 AggregateFunction::Total => ("total".to_string(), None, false, agg.arg.as_ref()),
1389 AggregateFunction::Avg => ("avg".to_string(), None, false, agg.arg.as_ref()),
1390 AggregateFunction::Min => ("min".to_string(), None, false, agg.arg.as_ref()),
1391 AggregateFunction::Max => ("max".to_string(), None, false, agg.arg.as_ref()),
1392 AggregateFunction::GroupConcat { separator } => (
1393 "group_concat".to_string(),
1394 separator.clone(),
1395 false,
1396 agg.arg.as_ref(),
1397 ),
1398 AggregateFunction::StringAgg { separator } => (
1399 "string_agg".to_string(),
1400 separator.clone(),
1401 false,
1402 agg.arg.as_ref(),
1403 ),
1404 };
1405 let signature = AggregateSignature {
1406 name,
1407 distinct: agg.distinct,
1408 star,
1409 arg_key: arg.map(expr_key),
1410 separator,
1411 };
1412 map.insert(signature, idx);
1413 }
1414 map
1415}
1416
1417fn build_aggregate_schema(
1418 group_keys: &[TypedExpr],
1419 aggregates: &[AggregateExpr],
1420) -> Vec<ColumnMetadata> {
1421 let mut schema = Vec::new();
1422 for (idx, key) in group_keys.iter().enumerate() {
1423 let name = match &key.kind {
1424 TypedExprKind::ColumnRef { column, .. } => column.clone(),
1425 _ => format!("group_{idx}"),
1426 };
1427 schema.push(ColumnMetadata::new(name, key.resolved_type.clone()));
1428 }
1429 for (idx, agg) in aggregates.iter().enumerate() {
1430 let name = match &agg.function {
1431 AggregateFunction::Count => format!("count_{idx}"),
1432 AggregateFunction::Sum => format!("sum_{idx}"),
1433 AggregateFunction::Total => format!("total_{idx}"),
1434 AggregateFunction::Avg => format!("avg_{idx}"),
1435 AggregateFunction::Min => format!("min_{idx}"),
1436 AggregateFunction::Max => format!("max_{idx}"),
1437 AggregateFunction::GroupConcat { .. } => format!("group_concat_{idx}"),
1438 AggregateFunction::StringAgg { .. } => format!("string_agg_{idx}"),
1439 };
1440 schema.push(ColumnMetadata::new(name, agg.result_type.clone()));
1441 }
1442 schema
1443}
1444
1445fn rewrite_expr_with_maps(
1446 expr: &TypedExpr,
1447 group_key_map: &HashMap<String, usize>,
1448 aggregate_map: &HashMap<AggregateSignature, usize>,
1449 output_names: &[String],
1450) -> Result<TypedExpr, PlannerError> {
1451 let group_key_count = output_names.len().saturating_sub(aggregate_map.len());
1452 let key = expr_key(expr);
1453 if let Some(idx) = group_key_map.get(&key) {
1454 return Ok(make_output_column_ref(
1455 *idx,
1456 output_names,
1457 expr.resolved_type.clone(),
1458 expr.span,
1459 ));
1460 }
1461
1462 match &expr.kind {
1463 TypedExprKind::FunctionCall {
1464 name,
1465 args,
1466 distinct,
1467 star,
1468 } if is_aggregate_function(name) => {
1469 let separator = if name.eq_ignore_ascii_case("group_concat") && args.len() == 2 {
1470 if let TypedExprKind::Literal(Literal::String(value)) = &args[1].kind {
1471 Some(value.clone())
1472 } else {
1473 return Err(PlannerError::invalid_expression(
1474 "GROUP_CONCAT separator must be a string literal".to_string(),
1475 ));
1476 }
1477 } else if name.eq_ignore_ascii_case("string_agg") && args.len() == 2 {
1478 if let TypedExprKind::Literal(Literal::String(value)) = &args[1].kind {
1479 Some(value.clone())
1480 } else {
1481 return Err(PlannerError::invalid_expression(
1482 "STRING_AGG separator must be a string literal".to_string(),
1483 ));
1484 }
1485 } else {
1486 None
1487 };
1488 let signature = AggregateSignature {
1489 name: name.to_ascii_lowercase(),
1490 distinct: *distinct,
1491 star: *star,
1492 arg_key: args.first().map(expr_key),
1493 separator,
1494 };
1495 let idx = aggregate_map.get(&signature).ok_or_else(|| {
1496 PlannerError::invalid_expression(
1497 "aggregate in expression is not part of plan".to_string(),
1498 )
1499 })?;
1500 let output_index = group_key_count + idx;
1501 Ok(make_output_column_ref(
1502 output_index,
1503 output_names,
1504 expr.resolved_type.clone(),
1505 expr.span,
1506 ))
1507 }
1508 TypedExprKind::FunctionCall {
1509 name,
1510 args,
1511 distinct,
1512 star,
1513 } => {
1514 if *distinct || *star {
1515 return Err(PlannerError::invalid_expression(
1516 "DISTINCT/STAR modifiers are only supported for aggregates".to_string(),
1517 ));
1518 }
1519 let mut rewritten_args = Vec::with_capacity(args.len());
1520 for arg in args {
1521 rewritten_args.push(rewrite_expr_with_maps(
1522 arg,
1523 group_key_map,
1524 aggregate_map,
1525 output_names,
1526 )?);
1527 }
1528 Ok(TypedExpr {
1529 kind: TypedExprKind::FunctionCall {
1530 name: name.clone(),
1531 args: rewritten_args,
1532 distinct: false,
1533 star: false,
1534 },
1535 resolved_type: expr.resolved_type.clone(),
1536 span: expr.span,
1537 })
1538 }
1539 TypedExprKind::BinaryOp { left, op, right } => {
1540 let left = rewrite_expr_with_maps(left, group_key_map, aggregate_map, output_names)?;
1541 let right = rewrite_expr_with_maps(right, group_key_map, aggregate_map, output_names)?;
1542 Ok(TypedExpr {
1543 kind: TypedExprKind::BinaryOp {
1544 left: Box::new(left),
1545 op: *op,
1546 right: Box::new(right),
1547 },
1548 resolved_type: expr.resolved_type.clone(),
1549 span: expr.span,
1550 })
1551 }
1552 TypedExprKind::UnaryOp { op, operand } => {
1553 let operand =
1554 rewrite_expr_with_maps(operand, group_key_map, aggregate_map, output_names)?;
1555 Ok(TypedExpr {
1556 kind: TypedExprKind::UnaryOp {
1557 op: *op,
1558 operand: Box::new(operand),
1559 },
1560 resolved_type: expr.resolved_type.clone(),
1561 span: expr.span,
1562 })
1563 }
1564 TypedExprKind::Between {
1565 expr: inner,
1566 low,
1567 high,
1568 negated,
1569 } => {
1570 let inner = rewrite_expr_with_maps(inner, group_key_map, aggregate_map, output_names)?;
1571 let low = rewrite_expr_with_maps(low, group_key_map, aggregate_map, output_names)?;
1572 let high = rewrite_expr_with_maps(high, group_key_map, aggregate_map, output_names)?;
1573 Ok(TypedExpr {
1574 kind: TypedExprKind::Between {
1575 expr: Box::new(inner),
1576 low: Box::new(low),
1577 high: Box::new(high),
1578 negated: *negated,
1579 },
1580 resolved_type: expr.resolved_type.clone(),
1581 span: expr.span,
1582 })
1583 }
1584 TypedExprKind::Like {
1585 expr: inner,
1586 pattern,
1587 escape,
1588 negated,
1589 } => {
1590 let inner = rewrite_expr_with_maps(inner, group_key_map, aggregate_map, output_names)?;
1591 let pattern =
1592 rewrite_expr_with_maps(pattern, group_key_map, aggregate_map, output_names)?;
1593 let escape = if let Some(esc) = escape {
1594 Some(Box::new(rewrite_expr_with_maps(
1595 esc,
1596 group_key_map,
1597 aggregate_map,
1598 output_names,
1599 )?))
1600 } else {
1601 None
1602 };
1603 Ok(TypedExpr {
1604 kind: TypedExprKind::Like {
1605 expr: Box::new(inner),
1606 pattern: Box::new(pattern),
1607 escape,
1608 negated: *negated,
1609 },
1610 resolved_type: expr.resolved_type.clone(),
1611 span: expr.span,
1612 })
1613 }
1614 TypedExprKind::InList {
1615 expr: inner,
1616 list,
1617 negated,
1618 } => {
1619 let inner = rewrite_expr_with_maps(inner, group_key_map, aggregate_map, output_names)?;
1620 let mut rewritten_list = Vec::with_capacity(list.len());
1621 for item in list {
1622 rewritten_list.push(rewrite_expr_with_maps(
1623 item,
1624 group_key_map,
1625 aggregate_map,
1626 output_names,
1627 )?);
1628 }
1629 Ok(TypedExpr {
1630 kind: TypedExprKind::InList {
1631 expr: Box::new(inner),
1632 list: rewritten_list,
1633 negated: *negated,
1634 },
1635 resolved_type: expr.resolved_type.clone(),
1636 span: expr.span,
1637 })
1638 }
1639 TypedExprKind::IsNull {
1640 expr: inner,
1641 negated,
1642 } => {
1643 let inner = rewrite_expr_with_maps(inner, group_key_map, aggregate_map, output_names)?;
1644 Ok(TypedExpr {
1645 kind: TypedExprKind::IsNull {
1646 expr: Box::new(inner),
1647 negated: *negated,
1648 },
1649 resolved_type: expr.resolved_type.clone(),
1650 span: expr.span,
1651 })
1652 }
1653 TypedExprKind::Literal(_) | TypedExprKind::VectorLiteral(_) => Ok(expr.clone()),
1654 TypedExprKind::ColumnRef { .. } => Err(PlannerError::invalid_expression(
1655 "column reference must appear in GROUP BY or be aggregated".to_string(),
1656 )),
1657 TypedExprKind::Cast {
1658 expr: inner,
1659 target_type,
1660 } => {
1661 let inner = rewrite_expr_with_maps(inner, group_key_map, aggregate_map, output_names)?;
1662 Ok(TypedExpr {
1663 kind: TypedExprKind::Cast {
1664 expr: Box::new(inner),
1665 target_type: target_type.clone(),
1666 },
1667 resolved_type: expr.resolved_type.clone(),
1668 span: expr.span,
1669 })
1670 }
1671 }
1672}
1673
1674fn make_output_column_ref(
1675 index: usize,
1676 output_names: &[String],
1677 resolved_type: ResolvedType,
1678 span: crate::ast::Span,
1679) -> TypedExpr {
1680 let name = output_names
1681 .get(index)
1682 .cloned()
1683 .unwrap_or_else(|| format!("col_{index}"));
1684 TypedExpr::column_ref("__agg__".to_string(), name, index, resolved_type, span)
1685}