Skip to main content

alopex_sql/planner/
mod.rs

1//! Query planning module for the Alopex SQL dialect.
2//!
3//! This module provides:
4//! - [`PlannerError`]: Error types for planning phase
5//! - [`ResolvedType`]: Normalized type information for type checking
6//! - [`TypedExpr`]: Type-checked expressions with resolved types
7//! - [`LogicalPlan`]: Logical query plan representation
8//! - [`NameResolver`]: Table and column reference resolution
9//! - [`TypeChecker`]: Expression type inference and validation
10//! - [`Planner`]: Main entry point for converting AST to LogicalPlan
11
12pub mod aggregate_expr;
13mod error;
14pub mod knn_optimizer;
15pub mod logical_plan;
16pub mod name_resolver;
17pub mod type_checker;
18pub mod typed_expr;
19pub mod types;
20
21#[cfg(test)]
22mod planner_tests;
23
24pub use aggregate_expr::{AggregateExpr, AggregateFunction};
25pub use error::PlannerError;
26pub use knn_optimizer::{KnnPattern, SortDirection, detect_knn_pattern};
27pub use logical_plan::LogicalPlan;
28pub use name_resolver::{NameResolver, ResolvedColumn};
29pub use type_checker::TypeChecker;
30pub use typed_expr::{
31    ProjectedColumn, Projection, SortExpr, TypedAssignment, TypedExpr, TypedExprKind,
32};
33pub use types::ResolvedType;
34
35use crate::ast::ddl::{
36    ColumnConstraint, ColumnDef, CreateIndex, CreateTable, DropIndex, DropTable,
37};
38use crate::ast::dml::{Delete, Insert, LITERAL_TABLE, OrderByExpr, Select, SelectItem, Update};
39use crate::ast::expr::Literal;
40use crate::ast::{Statement, StatementKind};
41use crate::catalog::{Catalog, ColumnMetadata, IndexMetadata, TableMetadata};
42use crate::{DataSourceFormat, TableType};
43use std::collections::HashMap;
44
45/// The SQL query planner.
46///
47/// The planner converts AST statements into logical plans. It performs:
48/// - Name resolution: Validates table and column references
49/// - Type checking: Infers and validates expression types
50/// - Plan construction: Builds the logical plan tree
51///
52/// # Design Notes
53///
54/// - The planner uses an immutable reference to the catalog (`&C`)
55/// - DDL statements produce plans but don't modify the catalog
56/// - The executor is responsible for applying catalog changes
57///
58/// # Examples
59///
60/// ```
61/// use alopex_sql::catalog::MemoryCatalog;
62/// use alopex_sql::planner::Planner;
63///
64/// let catalog = MemoryCatalog::new();
65/// let planner = Planner::new(&catalog);
66///
67/// // Parse and plan a statement
68/// // let stmt = parser.parse("SELECT * FROM users")?;
69/// // let plan = planner.plan(&stmt)?;
70/// ```
71pub struct Planner<'a, C: Catalog + ?Sized> {
72    catalog: &'a C,
73    name_resolver: NameResolver<'a, C>,
74    type_checker: TypeChecker<'a, C>,
75}
76
77impl<'a, C: Catalog + ?Sized> Planner<'a, C> {
78    /// Create a new planner with the given catalog.
79    pub fn new(catalog: &'a C) -> Self {
80        Self {
81            catalog,
82            name_resolver: NameResolver::new(catalog),
83            type_checker: TypeChecker::new(catalog),
84        }
85    }
86
87    /// Plan a SQL statement.
88    ///
89    /// This is the main entry point for converting an AST statement into a logical plan.
90    ///
91    /// # Errors
92    ///
93    /// Returns a `PlannerError` if:
94    /// - Referenced tables or columns don't exist
95    /// - Type checking fails
96    /// - DDL validation fails (e.g., table already exists for CREATE TABLE)
97    pub fn plan(&self, stmt: &Statement) -> Result<LogicalPlan, PlannerError> {
98        match &stmt.kind {
99            // DDL statements
100            StatementKind::CreateTable(ct) => self.plan_create_table(ct),
101            StatementKind::DropTable(dt) => self.plan_drop_table(dt),
102            StatementKind::CreateIndex(ci) => self.plan_create_index(ci),
103            StatementKind::DropIndex(di) => self.plan_drop_index(di),
104
105            // DML statements
106            StatementKind::Select(sel) => self.plan_select(sel),
107            StatementKind::Insert(ins) => self.plan_insert(ins),
108            StatementKind::Update(upd) => self.plan_update(upd),
109            StatementKind::Delete(del) => self.plan_delete(del),
110        }
111    }
112
113    // ============================================================
114    // DDL Planning Methods (Task 16)
115    // ============================================================
116
117    /// Plan a CREATE TABLE statement.
118    ///
119    /// Validates that the table doesn't already exist (unless IF NOT EXISTS is specified),
120    /// and converts the AST column definitions to catalog metadata.
121    fn plan_create_table(&self, stmt: &CreateTable) -> Result<LogicalPlan, PlannerError> {
122        // Check if table already exists
123        if !stmt.if_not_exists && self.catalog.table_exists(&stmt.name) {
124            return Err(PlannerError::table_already_exists(&stmt.name));
125        }
126
127        // Convert column definitions to metadata
128        let columns: Vec<ColumnMetadata> = stmt
129            .columns
130            .iter()
131            .map(|col| self.convert_column_def(col))
132            .collect();
133
134        // Collect primary key from table constraints
135        let primary_key = Self::extract_primary_key(stmt);
136
137        // Build table metadata
138        // Note: table_id defaults to 0 as placeholder; Executor assigns the actual ID
139        let mut table = TableMetadata::new(stmt.name.clone(), columns);
140        if let Some(pk) = primary_key {
141            table = table.with_primary_key(pk);
142        }
143        table.catalog_name = "default".to_string();
144        table.namespace_name = "default".to_string();
145        table.table_type = TableType::Managed;
146        table.data_source_format = DataSourceFormat::Alopex;
147        table.properties = HashMap::new();
148
149        Ok(LogicalPlan::CreateTable {
150            table,
151            if_not_exists: stmt.if_not_exists,
152            with_options: stmt.with_options.clone(),
153        })
154    }
155
156    /// Convert an AST column definition to catalog column metadata.
157    fn convert_column_def(&self, col: &ColumnDef) -> ColumnMetadata {
158        let data_type = ResolvedType::from_ast(&col.data_type);
159        let mut meta = ColumnMetadata::new(col.name.clone(), data_type);
160
161        // Process constraints
162        for constraint in &col.constraints {
163            meta = Self::apply_column_constraint(meta, constraint);
164        }
165
166        meta
167    }
168
169    /// Apply a column constraint to column metadata.
170    fn apply_column_constraint(
171        mut meta: ColumnMetadata,
172        constraint: &ColumnConstraint,
173    ) -> ColumnMetadata {
174        match constraint {
175            ColumnConstraint::NotNull => {
176                meta.not_null = true;
177            }
178            ColumnConstraint::Null => {
179                meta.not_null = false;
180            }
181            ColumnConstraint::PrimaryKey => {
182                meta.primary_key = true;
183                meta.not_null = true; // PRIMARY KEY implies NOT NULL
184            }
185            ColumnConstraint::Unique => {
186                meta.unique = true;
187            }
188            ColumnConstraint::Default(expr) => {
189                meta.default = Some(expr.clone());
190            }
191            ColumnConstraint::WithSpan { kind, .. } => {
192                meta = Self::apply_column_constraint(meta, kind);
193            }
194        }
195        meta
196    }
197
198    /// Extract primary key columns from table constraints.
199    fn extract_primary_key(stmt: &CreateTable) -> Option<Vec<String>> {
200        use crate::ast::ddl::TableConstraint;
201
202        // First check table-level constraints
203        // Note: Currently only PrimaryKey variant exists; when more variants are added,
204        // this should iterate to find the first PrimaryKey constraint
205        if let Some(TableConstraint::PrimaryKey { columns, .. }) = stmt.constraints.first() {
206            return Some(columns.clone());
207        }
208
209        // Then check column-level PRIMARY KEY constraints
210        let pk_columns: Vec<String> = stmt
211            .columns
212            .iter()
213            .filter(|col| col.constraints.iter().any(Self::is_primary_key_constraint))
214            .map(|col| col.name.clone())
215            .collect();
216
217        if pk_columns.is_empty() {
218            None
219        } else {
220            Some(pk_columns)
221        }
222    }
223
224    /// Check if a column constraint is a PRIMARY KEY constraint.
225    fn is_primary_key_constraint(constraint: &ColumnConstraint) -> bool {
226        match constraint {
227            ColumnConstraint::PrimaryKey => true,
228            ColumnConstraint::WithSpan { kind, .. } => Self::is_primary_key_constraint(kind),
229            _ => false,
230        }
231    }
232
233    /// Plan a DROP TABLE statement.
234    ///
235    /// Validates that the table exists (unless IF EXISTS is specified).
236    fn plan_drop_table(&self, stmt: &DropTable) -> Result<LogicalPlan, PlannerError> {
237        // Check if table exists
238        if !stmt.if_exists && !self.table_exists_in_default(&stmt.name) {
239            return Err(PlannerError::TableNotFound {
240                name: stmt.name.clone(),
241                line: stmt.span.start.line,
242                column: stmt.span.start.column,
243            });
244        }
245
246        Ok(LogicalPlan::DropTable {
247            name: stmt.name.clone(),
248            if_exists: stmt.if_exists,
249        })
250    }
251
252    fn table_exists_in_default(&self, name: &str) -> bool {
253        match self.catalog.get_table(name) {
254            Some(table) => table.catalog_name == "default" && table.namespace_name == "default",
255            None => false,
256        }
257    }
258
259    /// Plan a CREATE INDEX statement.
260    ///
261    /// Validates that:
262    /// - The index doesn't already exist (unless IF NOT EXISTS is specified)
263    /// - The target table exists
264    /// - The target column exists in the table
265    fn plan_create_index(&self, stmt: &CreateIndex) -> Result<LogicalPlan, PlannerError> {
266        // Check if index already exists
267        if !stmt.if_not_exists && self.catalog.index_exists(&stmt.name) {
268            return Err(PlannerError::index_already_exists(&stmt.name));
269        }
270
271        // Validate table exists
272        let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
273
274        // Validate column exists
275        self.name_resolver
276            .resolve_column(table, &stmt.column, stmt.span)?;
277
278        // Build index metadata
279        // Note: index_id is set to 0 as placeholder; Executor assigns the actual ID
280        // Note: column_indices will be resolved by Executor when table schema is available
281        let mut index = IndexMetadata::new(
282            0,
283            stmt.name.clone(),
284            stmt.table.clone(),
285            vec![stmt.column.clone()],
286        );
287
288        if let Some(method) = stmt.method {
289            index = index.with_method(method);
290        }
291
292        let options: Vec<(String, String)> = stmt
293            .options
294            .iter()
295            .map(|opt| (opt.key.clone(), opt.value.clone()))
296            .collect();
297        if !options.is_empty() {
298            index = index.with_options(options);
299        }
300
301        Ok(LogicalPlan::CreateIndex {
302            index,
303            if_not_exists: stmt.if_not_exists,
304        })
305    }
306
307    /// Plan a DROP INDEX statement.
308    ///
309    /// Validates that the index exists (unless IF EXISTS is specified).
310    fn plan_drop_index(&self, stmt: &DropIndex) -> Result<LogicalPlan, PlannerError> {
311        // Check if index exists
312        if !stmt.if_exists && !self.index_exists_in_default(&stmt.name) {
313            return Err(PlannerError::index_not_found(&stmt.name));
314        }
315
316        Ok(LogicalPlan::DropIndex {
317            name: stmt.name.clone(),
318            if_exists: stmt.if_exists,
319        })
320    }
321
322    fn index_exists_in_default(&self, name: &str) -> bool {
323        match self.catalog.get_index(name) {
324            Some(index) => index.catalog_name == "default" && index.namespace_name == "default",
325            None => false,
326        }
327    }
328
329    // ============================================================
330    // DML Planning Methods (Task 17 & 18)
331    // ============================================================
332
333    /// Plan a SELECT statement.
334    ///
335    /// Builds a logical plan tree: Scan -> Filter -> Sort -> Limit
336    /// Each layer is optional and only added if the corresponding clause is present.
337    fn plan_select(&self, stmt: &Select) -> Result<LogicalPlan, PlannerError> {
338        // 1. Resolve the FROM table
339        let literal_table;
340        let table = if stmt.from.name == LITERAL_TABLE {
341            literal_table = TableMetadata::new(LITERAL_TABLE, Vec::new());
342            &literal_table
343        } else {
344            self.name_resolver
345                .resolve_table(&stmt.from.name, stmt.from.span)?
346        };
347
348        let has_group_by = stmt
349            .group_by
350            .as_ref()
351            .is_some_and(|items| !items.is_empty());
352        let has_aggregate = self.select_contains_aggregate(stmt);
353        let distinct_only =
354            stmt.distinct && !has_group_by && !has_aggregate && stmt.having.is_none();
355
356        let scan_projection =
357            if has_group_by || has_aggregate || stmt.having.is_some() || stmt.distinct {
358                Projection::All(table.column_names().into_iter().map(String::from).collect())
359            } else {
360                self.build_projection(&stmt.projection, table)?
361            };
362
363        // 2. Create the base Scan plan
364        let mut plan = LogicalPlan::Scan {
365            table: stmt.from.name.clone(),
366            projection: scan_projection,
367        };
368
369        // 3. Add Filter if WHERE clause is present
370        if let Some(ref selection) = stmt.selection {
371            let predicate = self.type_checker.infer_type(selection, table)?;
372
373            // Verify predicate returns Boolean
374            if predicate.resolved_type != ResolvedType::Boolean {
375                return Err(PlannerError::type_mismatch(
376                    "Boolean",
377                    predicate.resolved_type.to_string(),
378                    selection.span,
379                ));
380            }
381
382            plan = LogicalPlan::Filter {
383                input: Box::new(plan),
384                predicate,
385            };
386        }
387
388        if has_group_by || has_aggregate || stmt.having.is_some() || stmt.distinct {
389            if !has_group_by && !has_aggregate && stmt.having.is_some() {
390                return Err(PlannerError::invalid_expression(
391                    "HAVING requires GROUP BY or aggregate functions".to_string(),
392                ));
393            }
394
395            let (group_keys, projected) = if distinct_only {
396                let projected =
397                    self.build_projected_columns_for_distinct(&stmt.projection, table)?;
398                let group_keys = projected.iter().map(|col| col.expr.clone()).collect();
399                (group_keys, projected)
400            } else {
401                let group_keys = self.build_group_keys(stmt, table)?;
402                let projected =
403                    self.build_projected_columns_for_aggregate(&stmt.projection, table)?;
404                (group_keys, projected)
405            };
406            let mut aggregates = Vec::new();
407            let mut agg_map = HashMap::new();
408
409            for col in &projected {
410                self.collect_aggregates_from_typed_expr(&col.expr, &mut aggregates, &mut agg_map)?;
411            }
412
413            let having_typed = if let Some(having) = &stmt.having {
414                let typed = self.type_checker.infer_type(having, table)?;
415                if typed.resolved_type != ResolvedType::Boolean {
416                    return Err(PlannerError::type_mismatch(
417                        "Boolean",
418                        typed.resolved_type.type_name().to_string(),
419                        typed.span,
420                    ));
421                }
422                self.collect_aggregates_from_typed_expr(&typed, &mut aggregates, &mut agg_map)?;
423                Some(typed)
424            } else {
425                None
426            };
427
428            let mut order_by = Vec::new();
429            if !stmt.order_by.is_empty() {
430                for order_expr in &stmt.order_by {
431                    let typed = self.type_checker.infer_type(&order_expr.expr, table)?;
432                    self.collect_aggregates_from_typed_expr(&typed, &mut aggregates, &mut agg_map)?;
433                    let asc = order_expr.asc.unwrap_or(true);
434                    let nulls_first = order_expr.nulls_first.unwrap_or(false);
435                    order_by.push(SortExpr::new(typed, asc, nulls_first));
436                }
437            }
438
439            if let Some(ref having) = having_typed {
440                self.type_checker
441                    .validate_having_expr(having, &group_keys, &aggregates)?;
442            }
443
444            let output_schema = build_aggregate_schema(&group_keys, &aggregates);
445            let output_names: Vec<String> = output_schema.iter().map(|c| c.name.clone()).collect();
446
447            let projection = self.build_aggregate_projection(
448                projected,
449                &group_keys,
450                &aggregates,
451                &output_names,
452            )?;
453
454            let having = if let Some(having) = having_typed {
455                Some(self.rewrite_expr_for_aggregate(
456                    &having,
457                    &group_keys,
458                    &aggregates,
459                    &output_names,
460                )?)
461            } else {
462                None
463            };
464
465            let order_by = order_by
466                .into_iter()
467                .map(|expr| {
468                    let rewritten = self.rewrite_expr_for_aggregate(
469                        &expr.expr,
470                        &group_keys,
471                        &aggregates,
472                        &output_names,
473                    )?;
474                    Ok(SortExpr::new(rewritten, expr.asc, expr.nulls_first))
475                })
476                .collect::<Result<Vec<_>, PlannerError>>()?;
477
478            plan = LogicalPlan::Aggregate {
479                input: Box::new(plan),
480                group_keys,
481                aggregates,
482                having,
483                projection,
484            };
485
486            if !order_by.is_empty() {
487                plan = LogicalPlan::Sort {
488                    input: Box::new(plan),
489                    order_by,
490                };
491            }
492
493            if stmt.limit.is_some() || stmt.offset.is_some() {
494                let limit = self.extract_limit_value(&stmt.limit, stmt.span)?;
495                let offset = self.extract_limit_value(&stmt.offset, stmt.span)?;
496                plan = LogicalPlan::Limit {
497                    input: Box::new(plan),
498                    limit,
499                    offset,
500                };
501            }
502
503            return Ok(plan);
504        }
505
506        // Non-aggregate path: ORDER BY + LIMIT/OFFSET
507        if !stmt.order_by.is_empty() {
508            let order_by = self.build_sort_exprs(&stmt.order_by, table)?;
509            plan = LogicalPlan::Sort {
510                input: Box::new(plan),
511                order_by,
512            };
513        }
514
515        if stmt.limit.is_some() || stmt.offset.is_some() {
516            let limit = self.extract_limit_value(&stmt.limit, stmt.span)?;
517            let offset = self.extract_limit_value(&stmt.offset, stmt.span)?;
518            plan = LogicalPlan::Limit {
519                input: Box::new(plan),
520                limit,
521                offset,
522            };
523        }
524
525        Ok(plan)
526    }
527
528    /// Build the projection for a SELECT statement.
529    ///
530    /// Handles wildcard expansion and expression type checking.
531    fn build_projection(
532        &self,
533        items: &[SelectItem],
534        table: &TableMetadata,
535    ) -> Result<Projection, PlannerError> {
536        // Check for wildcard - if present, expand it
537        if items.len() == 1 && matches!(&items[0], SelectItem::Wildcard { .. }) {
538            let columns = self.name_resolver.expand_wildcard(table);
539            return Ok(Projection::All(columns));
540        }
541
542        // Process each select item
543        let mut projected_columns = Vec::new();
544        for item in items {
545            match item {
546                SelectItem::Wildcard { span } => {
547                    // Wildcard mixed with other items - expand inline
548                    for col in &table.columns {
549                        let column_index = table.get_column_index(&col.name).unwrap();
550                        let typed_expr = TypedExpr::column_ref(
551                            table.name.clone(),
552                            col.name.clone(),
553                            column_index,
554                            col.data_type.clone(),
555                            *span,
556                        );
557                        projected_columns.push(ProjectedColumn::new(typed_expr));
558                    }
559                }
560                SelectItem::Expr { expr, alias, .. } => {
561                    let typed_expr = self.type_checker.infer_type(expr, table)?;
562                    let projected = if let Some(alias) = alias {
563                        ProjectedColumn::with_alias(typed_expr, alias.clone())
564                    } else {
565                        ProjectedColumn::new(typed_expr)
566                    };
567                    projected_columns.push(projected);
568                }
569            }
570        }
571
572        Ok(Projection::Columns(projected_columns))
573    }
574
575    /// Build sort expressions from ORDER BY clause.
576    fn build_sort_exprs(
577        &self,
578        order_by: &[OrderByExpr],
579        table: &TableMetadata,
580    ) -> Result<Vec<SortExpr>, PlannerError> {
581        let mut sort_exprs = Vec::new();
582
583        for order_expr in order_by {
584            let typed_expr = self.type_checker.infer_type(&order_expr.expr, table)?;
585
586            // Determine sort direction (default: ASC)
587            let asc = order_expr.asc.unwrap_or(true);
588
589            // Determine NULLS ordering (default: NULLS LAST for both ASC and DESC)
590            let nulls_first = order_expr.nulls_first.unwrap_or(false);
591
592            sort_exprs.push(SortExpr::new(typed_expr, asc, nulls_first));
593        }
594
595        Ok(sort_exprs)
596    }
597
598    fn select_contains_aggregate(&self, stmt: &Select) -> bool {
599        stmt.projection.iter().any(|item| match item {
600            SelectItem::Wildcard { .. } => false,
601            SelectItem::Expr { expr, .. } => expr_contains_aggregate(expr),
602        }) || stmt
603            .group_by
604            .as_ref()
605            .map(|items| items.iter().any(expr_contains_aggregate))
606            .unwrap_or(false)
607            || stmt
608                .having
609                .as_ref()
610                .map(expr_contains_aggregate)
611                .unwrap_or(false)
612            || stmt
613                .order_by
614                .iter()
615                .any(|order| expr_contains_aggregate(&order.expr))
616    }
617
618    fn build_group_keys(
619        &self,
620        stmt: &Select,
621        table: &TableMetadata,
622    ) -> Result<Vec<TypedExpr>, PlannerError> {
623        let mut keys = Vec::new();
624        if let Some(items) = &stmt.group_by {
625            for expr in items {
626                let typed = self.type_checker.infer_type(expr, table)?;
627                if typed_expr_contains_aggregate(&typed) {
628                    return Err(PlannerError::invalid_expression(
629                        "GROUP BY cannot contain aggregate functions".to_string(),
630                    ));
631                }
632                if !matches!(typed.kind, TypedExprKind::ColumnRef { .. }) {
633                    return Err(PlannerError::invalid_expression(
634                        "GROUP BY expressions must be column references".to_string(),
635                    ));
636                }
637                keys.push(typed);
638            }
639        }
640        Ok(keys)
641    }
642
643    fn build_projected_columns_for_aggregate(
644        &self,
645        items: &[SelectItem],
646        table: &TableMetadata,
647    ) -> Result<Vec<ProjectedColumn>, PlannerError> {
648        let mut projected = Vec::new();
649        for item in items {
650            match item {
651                SelectItem::Wildcard { .. } => {
652                    return Err(PlannerError::invalid_expression(
653                        "wildcard projection not supported with GROUP BY/aggregate".to_string(),
654                    ));
655                }
656                SelectItem::Expr { expr, alias, .. } => {
657                    let typed = self.type_checker.infer_type(expr, table)?;
658                    projected.push(ProjectedColumn {
659                        expr: typed,
660                        alias: alias.clone(),
661                    });
662                }
663            }
664        }
665        Ok(projected)
666    }
667
668    fn build_projected_columns_for_distinct(
669        &self,
670        items: &[SelectItem],
671        table: &TableMetadata,
672    ) -> Result<Vec<ProjectedColumn>, PlannerError> {
673        let projection = self.build_projection(items, table)?;
674        match projection {
675            Projection::All(columns) => {
676                let mut projected = Vec::with_capacity(columns.len());
677                for column in columns {
678                    let column_index = table.get_column_index(&column).ok_or_else(|| {
679                        PlannerError::invalid_expression(format!(
680                            "column '{column}' not found for DISTINCT projection"
681                        ))
682                    })?;
683                    let column_meta = table.get_column(&column).ok_or_else(|| {
684                        PlannerError::invalid_expression(format!(
685                            "column '{column}' not found for DISTINCT projection"
686                        ))
687                    })?;
688                    let typed_expr = TypedExpr::column_ref(
689                        table.name.clone(),
690                        column.clone(),
691                        column_index,
692                        column_meta.data_type.clone(),
693                        crate::ast::Span::default(),
694                    );
695                    projected.push(ProjectedColumn::new(typed_expr));
696                }
697                Ok(projected)
698            }
699            Projection::Columns(columns) => Ok(columns),
700        }
701    }
702
703    fn collect_aggregates_from_typed_expr(
704        &self,
705        expr: &TypedExpr,
706        aggregates: &mut Vec<AggregateExpr>,
707        aggregate_map: &mut HashMap<AggregateSignature, usize>,
708    ) -> Result<(), PlannerError> {
709        match &expr.kind {
710            TypedExprKind::FunctionCall {
711                name,
712                args,
713                distinct,
714                star,
715            } if is_aggregate_function(name) => {
716                for arg in args {
717                    if typed_expr_contains_aggregate(arg) {
718                        return Err(PlannerError::invalid_expression(
719                            "nested aggregate functions are not supported".to_string(),
720                        ));
721                    }
722                }
723                let (agg, signature) =
724                    self.build_aggregate_expr_from_typed(expr, name, args, *distinct, *star)?;
725                aggregate_map.entry(signature).or_insert_with(|| {
726                    aggregates.push(agg);
727                    aggregates.len() - 1
728                });
729                Ok(())
730            }
731            TypedExprKind::BinaryOp { left, right, .. } => {
732                self.collect_aggregates_from_typed_expr(left, aggregates, aggregate_map)?;
733                self.collect_aggregates_from_typed_expr(right, aggregates, aggregate_map)?;
734                Ok(())
735            }
736            TypedExprKind::UnaryOp { operand, .. } => {
737                self.collect_aggregates_from_typed_expr(operand, aggregates, aggregate_map)
738            }
739            TypedExprKind::FunctionCall { args, .. } => {
740                for arg in args {
741                    self.collect_aggregates_from_typed_expr(arg, aggregates, aggregate_map)?;
742                }
743                Ok(())
744            }
745            TypedExprKind::Between {
746                expr, low, high, ..
747            } => {
748                self.collect_aggregates_from_typed_expr(expr, aggregates, aggregate_map)?;
749                self.collect_aggregates_from_typed_expr(low, aggregates, aggregate_map)?;
750                self.collect_aggregates_from_typed_expr(high, aggregates, aggregate_map)?;
751                Ok(())
752            }
753            TypedExprKind::Like {
754                expr,
755                pattern,
756                escape,
757                ..
758            } => {
759                self.collect_aggregates_from_typed_expr(expr, aggregates, aggregate_map)?;
760                self.collect_aggregates_from_typed_expr(pattern, aggregates, aggregate_map)?;
761                if let Some(esc) = escape {
762                    self.collect_aggregates_from_typed_expr(esc, aggregates, aggregate_map)?;
763                }
764                Ok(())
765            }
766            TypedExprKind::InList { expr, list, .. } => {
767                self.collect_aggregates_from_typed_expr(expr, aggregates, aggregate_map)?;
768                for item in list {
769                    self.collect_aggregates_from_typed_expr(item, aggregates, aggregate_map)?;
770                }
771                Ok(())
772            }
773            TypedExprKind::IsNull { expr, .. } => {
774                self.collect_aggregates_from_typed_expr(expr, aggregates, aggregate_map)
775            }
776            _ => Ok(()),
777        }
778    }
779
780    fn build_aggregate_expr_from_typed(
781        &self,
782        expr: &TypedExpr,
783        name: &str,
784        args: &[TypedExpr],
785        distinct: bool,
786        star: bool,
787    ) -> Result<(AggregateExpr, AggregateSignature), PlannerError> {
788        let lower = name.to_lowercase();
789        match lower.as_str() {
790            "count" => {
791                if star {
792                    let agg = AggregateExpr::count_star();
793                    let signature = aggregate_signature(name, distinct, star, None, None, expr);
794                    return Ok((agg, signature));
795                }
796                if args.len() != 1 {
797                    return Err(PlannerError::type_mismatch(
798                        "1 argument",
799                        format!("{} arguments", args.len()),
800                        expr.span,
801                    ));
802                }
803                let agg = AggregateExpr {
804                    function: AggregateFunction::Count,
805                    arg: Some(args[0].clone()),
806                    distinct,
807                    result_type: ResolvedType::BigInt,
808                };
809                let signature =
810                    aggregate_signature(name, distinct, star, Some(&args[0]), None, expr);
811                Ok((agg, signature))
812            }
813            "sum" => {
814                let arg = self.require_single_aggregate_arg(args, expr.span)?;
815                let agg = AggregateExpr {
816                    function: AggregateFunction::Sum,
817                    arg: Some(arg.clone()),
818                    distinct: false,
819                    result_type: ResolvedType::Double,
820                };
821                let signature = aggregate_signature(name, false, star, Some(arg), None, expr);
822                Ok((agg, signature))
823            }
824            "total" => {
825                let arg = self.require_single_aggregate_arg(args, expr.span)?;
826                let agg = AggregateExpr {
827                    function: AggregateFunction::Total,
828                    arg: Some(arg.clone()),
829                    distinct: false,
830                    result_type: ResolvedType::Double,
831                };
832                let signature = aggregate_signature(name, false, star, Some(arg), None, expr);
833                Ok((agg, signature))
834            }
835            "avg" => {
836                let arg = self.require_single_aggregate_arg(args, expr.span)?;
837                let agg = AggregateExpr {
838                    function: AggregateFunction::Avg,
839                    arg: Some(arg.clone()),
840                    distinct: false,
841                    result_type: ResolvedType::Double,
842                };
843                let signature = aggregate_signature(name, false, star, Some(arg), None, expr);
844                Ok((agg, signature))
845            }
846            "min" => {
847                let arg = self.require_single_aggregate_arg(args, expr.span)?;
848                let agg = AggregateExpr {
849                    function: AggregateFunction::Min,
850                    arg: Some(arg.clone()),
851                    distinct: false,
852                    result_type: arg.resolved_type.clone(),
853                };
854                let signature = aggregate_signature(name, false, star, Some(arg), None, expr);
855                Ok((agg, signature))
856            }
857            "max" => {
858                let arg = self.require_single_aggregate_arg(args, expr.span)?;
859                let agg = AggregateExpr {
860                    function: AggregateFunction::Max,
861                    arg: Some(arg.clone()),
862                    distinct: false,
863                    result_type: arg.resolved_type.clone(),
864                };
865                let signature = aggregate_signature(name, false, star, Some(arg), None, expr);
866                Ok((agg, signature))
867            }
868            "group_concat" => {
869                if args.is_empty() || args.len() > 2 {
870                    return Err(PlannerError::type_mismatch(
871                        "1 or 2 arguments",
872                        format!("{} arguments", args.len()),
873                        expr.span,
874                    ));
875                }
876                let arg = &args[0];
877                let mut separator = None;
878                if args.len() == 2 {
879                    if let TypedExprKind::Literal(Literal::String(value)) = &args[1].kind {
880                        separator = Some(value.clone());
881                    } else {
882                        return Err(PlannerError::invalid_expression(
883                            "GROUP_CONCAT separator must be a string literal".to_string(),
884                        ));
885                    }
886                }
887                let agg = AggregateExpr {
888                    function: AggregateFunction::GroupConcat { separator },
889                    arg: Some(arg.clone()),
890                    distinct: false,
891                    result_type: ResolvedType::Text,
892                };
893                let signature = aggregate_signature(
894                    name,
895                    false,
896                    star,
897                    Some(arg),
898                    match &agg.function {
899                        AggregateFunction::GroupConcat { separator } => separator.as_ref(),
900                        _ => None,
901                    },
902                    expr,
903                );
904                Ok((agg, signature))
905            }
906            "string_agg" => {
907                if args.len() != 2 {
908                    return Err(PlannerError::type_mismatch(
909                        "2 arguments",
910                        format!("{} arguments", args.len()),
911                        expr.span,
912                    ));
913                }
914                let arg = &args[0];
915                let separator =
916                    if let TypedExprKind::Literal(Literal::String(value)) = &args[1].kind {
917                        Some(value.clone())
918                    } else {
919                        return Err(PlannerError::invalid_expression(
920                            "STRING_AGG separator must be a string literal".to_string(),
921                        ));
922                    };
923                let agg = AggregateExpr {
924                    function: AggregateFunction::StringAgg { separator },
925                    arg: Some(arg.clone()),
926                    distinct: false,
927                    result_type: ResolvedType::Text,
928                };
929                let signature = aggregate_signature(
930                    name,
931                    false,
932                    star,
933                    Some(arg),
934                    match &agg.function {
935                        AggregateFunction::StringAgg { separator } => separator.as_ref(),
936                        _ => None,
937                    },
938                    expr,
939                );
940                Ok((agg, signature))
941            }
942            _ => Err(PlannerError::unsupported_feature(
943                format!("function '{}'", name),
944                "future",
945                expr.span,
946            )),
947        }
948    }
949
950    fn require_single_aggregate_arg<'b>(
951        &self,
952        args: &'b [TypedExpr],
953        span: crate::ast::Span,
954    ) -> Result<&'b TypedExpr, PlannerError> {
955        if args.len() != 1 {
956            return Err(PlannerError::type_mismatch(
957                "1 argument",
958                format!("{} arguments", args.len()),
959                span,
960            ));
961        }
962        Ok(&args[0])
963    }
964
965    fn build_aggregate_projection(
966        &self,
967        projected: Vec<ProjectedColumn>,
968        group_keys: &[TypedExpr],
969        aggregates: &[AggregateExpr],
970        output_names: &[String],
971    ) -> Result<Projection, PlannerError> {
972        let mut columns = Vec::new();
973        for col in projected {
974            let rewritten =
975                self.rewrite_expr_for_aggregate(&col.expr, group_keys, aggregates, output_names)?;
976            columns.push(ProjectedColumn {
977                expr: rewritten,
978                alias: col.alias,
979            });
980        }
981        Ok(Projection::Columns(columns))
982    }
983
984    fn rewrite_expr_for_aggregate(
985        &self,
986        expr: &TypedExpr,
987        group_keys: &[TypedExpr],
988        aggregates: &[AggregateExpr],
989        output_names: &[String],
990    ) -> Result<TypedExpr, PlannerError> {
991        let group_key_map = build_group_key_map(group_keys);
992        let aggregate_map = build_aggregate_map(aggregates);
993
994        rewrite_expr_with_maps(expr, &group_key_map, &aggregate_map, output_names)
995    }
996
997    /// Extract a numeric value from a LIMIT or OFFSET expression.
998    ///
999    /// Currently only supports literal integer values.
1000    fn extract_limit_value(
1001        &self,
1002        expr: &Option<crate::ast::expr::Expr>,
1003        stmt_span: crate::ast::Span,
1004    ) -> Result<Option<u64>, PlannerError> {
1005        match expr {
1006            None => Ok(None),
1007            Some(e) => {
1008                // For now, only support literal integers
1009                if let crate::ast::expr::ExprKind::Literal(Literal::Number(s)) = &e.kind {
1010                    s.parse::<u64>().map(Some).map_err(|_| {
1011                        PlannerError::type_mismatch("unsigned integer", s.clone(), e.span)
1012                    })
1013                } else {
1014                    Err(PlannerError::unsupported_feature(
1015                        "non-literal LIMIT/OFFSET",
1016                        "v0.3.0+",
1017                        stmt_span,
1018                    ))
1019                }
1020            }
1021        }
1022    }
1023
1024    /// Plan an INSERT statement.
1025    ///
1026    /// Handles column list specification or implicit column ordering.
1027    /// When columns are omitted, uses table definition order from TableMetadata.
1028    fn plan_insert(&self, stmt: &Insert) -> Result<LogicalPlan, PlannerError> {
1029        // Resolve the target table
1030        let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
1031
1032        // Determine the column list
1033        let columns: Vec<String> = if let Some(ref cols) = stmt.columns {
1034            // Explicit column list - validate each column exists
1035            for col in cols {
1036                self.name_resolver.resolve_column(table, col, stmt.span)?;
1037            }
1038            cols.clone()
1039        } else {
1040            // Implicit - use all columns in table definition order
1041            table.column_names().into_iter().map(String::from).collect()
1042        };
1043
1044        // Validate and type-check each row of values
1045        let mut typed_values: Vec<Vec<TypedExpr>> = Vec::new();
1046
1047        for row in &stmt.values {
1048            // Check column count matches
1049            if row.len() != columns.len() {
1050                return Err(PlannerError::column_value_count_mismatch(
1051                    columns.len(),
1052                    row.len(),
1053                    stmt.span,
1054                ));
1055            }
1056
1057            // Type-check each value
1058            let typed_row = self.type_check_insert_values(row, &columns, table)?;
1059            typed_values.push(typed_row);
1060        }
1061
1062        Ok(LogicalPlan::Insert {
1063            table: table.name.clone(),
1064            columns,
1065            values: typed_values,
1066        })
1067    }
1068
1069    /// Type-check INSERT values against column definitions.
1070    fn type_check_insert_values(
1071        &self,
1072        values: &[crate::ast::expr::Expr],
1073        columns: &[String],
1074        table: &TableMetadata,
1075    ) -> Result<Vec<TypedExpr>, PlannerError> {
1076        let mut typed_values = Vec::new();
1077
1078        for (i, value) in values.iter().enumerate() {
1079            let column_name = &columns[i];
1080            let column_meta = table.get_column(column_name).ok_or_else(|| {
1081                PlannerError::column_not_found(column_name, &table.name, value.span)
1082            })?;
1083
1084            // Type-check the value expression
1085            let typed_value = self.type_checker.infer_type(value, table)?;
1086
1087            // Check for NOT NULL constraint violation (except for NULL literal which is allowed if nullable)
1088            if column_meta.not_null
1089                && matches!(&typed_value.kind, TypedExprKind::Literal(Literal::Null))
1090            {
1091                return Err(PlannerError::null_constraint_violation(
1092                    column_name,
1093                    value.span,
1094                ));
1095            }
1096
1097            // Validate type compatibility
1098            self.validate_type_assignment(&typed_value, &column_meta.data_type, value.span)?;
1099
1100            typed_values.push(typed_value);
1101        }
1102
1103        Ok(typed_values)
1104    }
1105
1106    /// Validate that a value type can be assigned to a column type.
1107    fn validate_type_assignment(
1108        &self,
1109        value: &TypedExpr,
1110        target_type: &ResolvedType,
1111        span: crate::ast::Span,
1112    ) -> Result<(), PlannerError> {
1113        // NULL can be assigned to any nullable column
1114        if value.resolved_type == ResolvedType::Null {
1115            return Ok(());
1116        }
1117
1118        // Check for exact match or implicit conversion compatibility
1119        if self.types_compatible(&value.resolved_type, target_type) {
1120            return Ok(());
1121        }
1122
1123        Err(PlannerError::type_mismatch(
1124            target_type.to_string(),
1125            value.resolved_type.to_string(),
1126            span,
1127        ))
1128    }
1129
1130    /// Check if two types are compatible for assignment.
1131    fn types_compatible(&self, source: &ResolvedType, target: &ResolvedType) -> bool {
1132        use ResolvedType::*;
1133
1134        // Same type is always compatible
1135        if source == target {
1136            return true;
1137        }
1138
1139        // Numeric promotions
1140        match (source, target) {
1141            // Integer can be assigned to BigInt, Float, Double
1142            (Integer, BigInt) | (Integer, Float) | (Integer, Double) => true,
1143            // BigInt can be assigned to Float, Double
1144            (BigInt, Float) | (BigInt, Double) => true,
1145            // Float can be assigned to Double
1146            (Float, Double) => true,
1147            // Vector dimensions must match
1148            (Vector { dimension: d1, .. }, Vector { dimension: d2, .. }) => d1 == d2,
1149            _ => false,
1150        }
1151    }
1152
1153    /// Plan an UPDATE statement.
1154    ///
1155    /// Validates assignments and optional WHERE clause.
1156    fn plan_update(&self, stmt: &Update) -> Result<LogicalPlan, PlannerError> {
1157        // Resolve the target table
1158        let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
1159
1160        // Process assignments
1161        let mut typed_assignments = Vec::new();
1162
1163        for assignment in &stmt.assignments {
1164            // Resolve the column
1165            let column_meta =
1166                self.name_resolver
1167                    .resolve_column(table, &assignment.column, assignment.span)?;
1168            let column_index = table.get_column_index(&assignment.column).unwrap();
1169
1170            // Type-check the value expression
1171            let typed_value = self.type_checker.infer_type(&assignment.value, table)?;
1172
1173            // Check NOT NULL constraint
1174            if column_meta.not_null
1175                && matches!(&typed_value.kind, TypedExprKind::Literal(Literal::Null))
1176            {
1177                return Err(PlannerError::null_constraint_violation(
1178                    &assignment.column,
1179                    assignment.value.span,
1180                ));
1181            }
1182
1183            // Validate type compatibility
1184            self.validate_type_assignment(
1185                &typed_value,
1186                &column_meta.data_type,
1187                assignment.value.span,
1188            )?;
1189
1190            typed_assignments.push(TypedAssignment::new(
1191                assignment.column.clone(),
1192                column_index,
1193                typed_value,
1194            ));
1195        }
1196
1197        // Process optional WHERE clause
1198        let filter = if let Some(ref selection) = stmt.selection {
1199            let predicate = self.type_checker.infer_type(selection, table)?;
1200
1201            // Verify predicate returns Boolean
1202            if predicate.resolved_type != ResolvedType::Boolean {
1203                return Err(PlannerError::type_mismatch(
1204                    "Boolean",
1205                    predicate.resolved_type.to_string(),
1206                    selection.span,
1207                ));
1208            }
1209
1210            Some(predicate)
1211        } else {
1212            None
1213        };
1214
1215        Ok(LogicalPlan::Update {
1216            table: table.name.clone(),
1217            assignments: typed_assignments,
1218            filter,
1219        })
1220    }
1221
1222    /// Plan a DELETE statement.
1223    ///
1224    /// Validates optional WHERE clause.
1225    fn plan_delete(&self, stmt: &Delete) -> Result<LogicalPlan, PlannerError> {
1226        // Resolve the target table
1227        let table = self.name_resolver.resolve_table(&stmt.table, stmt.span)?;
1228
1229        // Process optional WHERE clause
1230        let filter = if let Some(ref selection) = stmt.selection {
1231            let predicate = self.type_checker.infer_type(selection, table)?;
1232
1233            // Verify predicate returns Boolean
1234            if predicate.resolved_type != ResolvedType::Boolean {
1235                return Err(PlannerError::type_mismatch(
1236                    "Boolean",
1237                    predicate.resolved_type.to_string(),
1238                    selection.span,
1239                ));
1240            }
1241
1242            Some(predicate)
1243        } else {
1244            None
1245        };
1246
1247        Ok(LogicalPlan::Delete {
1248            table: table.name.clone(),
1249            filter,
1250        })
1251    }
1252}
1253
1254#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1255struct AggregateSignature {
1256    name: String,
1257    distinct: bool,
1258    star: bool,
1259    arg_key: Option<String>,
1260    separator: Option<String>,
1261}
1262
1263fn expr_contains_aggregate(expr: &crate::ast::expr::Expr) -> bool {
1264    use crate::ast::expr::ExprKind;
1265
1266    match &expr.kind {
1267        ExprKind::FunctionCall { name, args, .. } => {
1268            if is_aggregate_function(name) {
1269                return true;
1270            }
1271            args.iter().any(expr_contains_aggregate)
1272        }
1273        ExprKind::BinaryOp { left, right, .. } => {
1274            expr_contains_aggregate(left) || expr_contains_aggregate(right)
1275        }
1276        ExprKind::UnaryOp { operand, .. } => expr_contains_aggregate(operand),
1277        ExprKind::Between {
1278            expr, low, high, ..
1279        } => {
1280            expr_contains_aggregate(expr)
1281                || expr_contains_aggregate(low)
1282                || expr_contains_aggregate(high)
1283        }
1284        ExprKind::Like {
1285            expr,
1286            pattern,
1287            escape,
1288            ..
1289        } => {
1290            expr_contains_aggregate(expr)
1291                || expr_contains_aggregate(pattern)
1292                || escape.as_deref().is_some_and(expr_contains_aggregate)
1293        }
1294        ExprKind::InList { expr, list, .. } => {
1295            expr_contains_aggregate(expr) || list.iter().any(expr_contains_aggregate)
1296        }
1297        ExprKind::IsNull { expr, .. } => expr_contains_aggregate(expr),
1298        ExprKind::Literal(_) | ExprKind::VectorLiteral(_) | ExprKind::ColumnRef { .. } => false,
1299    }
1300}
1301
1302fn typed_expr_contains_aggregate(expr: &TypedExpr) -> bool {
1303    match &expr.kind {
1304        TypedExprKind::FunctionCall { name, args, .. } => {
1305            if is_aggregate_function(name) {
1306                return true;
1307            }
1308            args.iter().any(typed_expr_contains_aggregate)
1309        }
1310        TypedExprKind::BinaryOp { left, right, .. } => {
1311            typed_expr_contains_aggregate(left) || typed_expr_contains_aggregate(right)
1312        }
1313        TypedExprKind::UnaryOp { operand, .. } => typed_expr_contains_aggregate(operand),
1314        TypedExprKind::Between {
1315            expr, low, high, ..
1316        } => {
1317            typed_expr_contains_aggregate(expr)
1318                || typed_expr_contains_aggregate(low)
1319                || typed_expr_contains_aggregate(high)
1320        }
1321        TypedExprKind::Like {
1322            expr,
1323            pattern,
1324            escape,
1325            ..
1326        } => {
1327            typed_expr_contains_aggregate(expr)
1328                || typed_expr_contains_aggregate(pattern)
1329                || escape
1330                    .as_ref()
1331                    .is_some_and(|inner| typed_expr_contains_aggregate(inner))
1332        }
1333        TypedExprKind::InList { expr, list, .. } => {
1334            typed_expr_contains_aggregate(expr) || list.iter().any(typed_expr_contains_aggregate)
1335        }
1336        TypedExprKind::IsNull { expr, .. } => typed_expr_contains_aggregate(expr),
1337        _ => false,
1338    }
1339}
1340
1341fn is_aggregate_function(name: &str) -> bool {
1342    matches!(
1343        name.to_ascii_lowercase().as_str(),
1344        "count" | "sum" | "total" | "avg" | "min" | "max" | "group_concat" | "string_agg"
1345    )
1346}
1347
1348fn expr_key(expr: &TypedExpr) -> String {
1349    format!("{:?}", expr.kind)
1350}
1351
1352fn aggregate_signature(
1353    name: &str,
1354    distinct: bool,
1355    star: bool,
1356    arg: Option<&TypedExpr>,
1357    separator: Option<&String>,
1358    _expr: &TypedExpr,
1359) -> AggregateSignature {
1360    AggregateSignature {
1361        name: name.to_ascii_lowercase(),
1362        distinct,
1363        star,
1364        arg_key: arg.map(expr_key),
1365        separator: separator.cloned(),
1366    }
1367}
1368
1369fn build_group_key_map(group_keys: &[TypedExpr]) -> HashMap<String, usize> {
1370    let mut map = HashMap::new();
1371    for (idx, key) in group_keys.iter().enumerate() {
1372        map.insert(expr_key(key), idx);
1373    }
1374    map
1375}
1376
1377fn build_aggregate_map(aggregates: &[AggregateExpr]) -> HashMap<AggregateSignature, usize> {
1378    let mut map = HashMap::new();
1379    for (idx, agg) in aggregates.iter().enumerate() {
1380        let (name, separator, star, arg) = match &agg.function {
1381            AggregateFunction::Count => (
1382                "count".to_string(),
1383                None,
1384                agg.arg.is_none(),
1385                agg.arg.as_ref(),
1386            ),
1387            AggregateFunction::Sum => ("sum".to_string(), None, false, agg.arg.as_ref()),
1388            AggregateFunction::Total => ("total".to_string(), None, false, agg.arg.as_ref()),
1389            AggregateFunction::Avg => ("avg".to_string(), None, false, agg.arg.as_ref()),
1390            AggregateFunction::Min => ("min".to_string(), None, false, agg.arg.as_ref()),
1391            AggregateFunction::Max => ("max".to_string(), None, false, agg.arg.as_ref()),
1392            AggregateFunction::GroupConcat { separator } => (
1393                "group_concat".to_string(),
1394                separator.clone(),
1395                false,
1396                agg.arg.as_ref(),
1397            ),
1398            AggregateFunction::StringAgg { separator } => (
1399                "string_agg".to_string(),
1400                separator.clone(),
1401                false,
1402                agg.arg.as_ref(),
1403            ),
1404        };
1405        let signature = AggregateSignature {
1406            name,
1407            distinct: agg.distinct,
1408            star,
1409            arg_key: arg.map(expr_key),
1410            separator,
1411        };
1412        map.insert(signature, idx);
1413    }
1414    map
1415}
1416
1417fn build_aggregate_schema(
1418    group_keys: &[TypedExpr],
1419    aggregates: &[AggregateExpr],
1420) -> Vec<ColumnMetadata> {
1421    let mut schema = Vec::new();
1422    for (idx, key) in group_keys.iter().enumerate() {
1423        let name = match &key.kind {
1424            TypedExprKind::ColumnRef { column, .. } => column.clone(),
1425            _ => format!("group_{idx}"),
1426        };
1427        schema.push(ColumnMetadata::new(name, key.resolved_type.clone()));
1428    }
1429    for (idx, agg) in aggregates.iter().enumerate() {
1430        let name = match &agg.function {
1431            AggregateFunction::Count => format!("count_{idx}"),
1432            AggregateFunction::Sum => format!("sum_{idx}"),
1433            AggregateFunction::Total => format!("total_{idx}"),
1434            AggregateFunction::Avg => format!("avg_{idx}"),
1435            AggregateFunction::Min => format!("min_{idx}"),
1436            AggregateFunction::Max => format!("max_{idx}"),
1437            AggregateFunction::GroupConcat { .. } => format!("group_concat_{idx}"),
1438            AggregateFunction::StringAgg { .. } => format!("string_agg_{idx}"),
1439        };
1440        schema.push(ColumnMetadata::new(name, agg.result_type.clone()));
1441    }
1442    schema
1443}
1444
1445fn rewrite_expr_with_maps(
1446    expr: &TypedExpr,
1447    group_key_map: &HashMap<String, usize>,
1448    aggregate_map: &HashMap<AggregateSignature, usize>,
1449    output_names: &[String],
1450) -> Result<TypedExpr, PlannerError> {
1451    let group_key_count = output_names.len().saturating_sub(aggregate_map.len());
1452    let key = expr_key(expr);
1453    if let Some(idx) = group_key_map.get(&key) {
1454        return Ok(make_output_column_ref(
1455            *idx,
1456            output_names,
1457            expr.resolved_type.clone(),
1458            expr.span,
1459        ));
1460    }
1461
1462    match &expr.kind {
1463        TypedExprKind::FunctionCall {
1464            name,
1465            args,
1466            distinct,
1467            star,
1468        } if is_aggregate_function(name) => {
1469            let separator = if name.eq_ignore_ascii_case("group_concat") && args.len() == 2 {
1470                if let TypedExprKind::Literal(Literal::String(value)) = &args[1].kind {
1471                    Some(value.clone())
1472                } else {
1473                    return Err(PlannerError::invalid_expression(
1474                        "GROUP_CONCAT separator must be a string literal".to_string(),
1475                    ));
1476                }
1477            } else if name.eq_ignore_ascii_case("string_agg") && args.len() == 2 {
1478                if let TypedExprKind::Literal(Literal::String(value)) = &args[1].kind {
1479                    Some(value.clone())
1480                } else {
1481                    return Err(PlannerError::invalid_expression(
1482                        "STRING_AGG separator must be a string literal".to_string(),
1483                    ));
1484                }
1485            } else {
1486                None
1487            };
1488            let signature = AggregateSignature {
1489                name: name.to_ascii_lowercase(),
1490                distinct: *distinct,
1491                star: *star,
1492                arg_key: args.first().map(expr_key),
1493                separator,
1494            };
1495            let idx = aggregate_map.get(&signature).ok_or_else(|| {
1496                PlannerError::invalid_expression(
1497                    "aggregate in expression is not part of plan".to_string(),
1498                )
1499            })?;
1500            let output_index = group_key_count + idx;
1501            Ok(make_output_column_ref(
1502                output_index,
1503                output_names,
1504                expr.resolved_type.clone(),
1505                expr.span,
1506            ))
1507        }
1508        TypedExprKind::FunctionCall {
1509            name,
1510            args,
1511            distinct,
1512            star,
1513        } => {
1514            if *distinct || *star {
1515                return Err(PlannerError::invalid_expression(
1516                    "DISTINCT/STAR modifiers are only supported for aggregates".to_string(),
1517                ));
1518            }
1519            let mut rewritten_args = Vec::with_capacity(args.len());
1520            for arg in args {
1521                rewritten_args.push(rewrite_expr_with_maps(
1522                    arg,
1523                    group_key_map,
1524                    aggregate_map,
1525                    output_names,
1526                )?);
1527            }
1528            Ok(TypedExpr {
1529                kind: TypedExprKind::FunctionCall {
1530                    name: name.clone(),
1531                    args: rewritten_args,
1532                    distinct: false,
1533                    star: false,
1534                },
1535                resolved_type: expr.resolved_type.clone(),
1536                span: expr.span,
1537            })
1538        }
1539        TypedExprKind::BinaryOp { left, op, right } => {
1540            let left = rewrite_expr_with_maps(left, group_key_map, aggregate_map, output_names)?;
1541            let right = rewrite_expr_with_maps(right, group_key_map, aggregate_map, output_names)?;
1542            Ok(TypedExpr {
1543                kind: TypedExprKind::BinaryOp {
1544                    left: Box::new(left),
1545                    op: *op,
1546                    right: Box::new(right),
1547                },
1548                resolved_type: expr.resolved_type.clone(),
1549                span: expr.span,
1550            })
1551        }
1552        TypedExprKind::UnaryOp { op, operand } => {
1553            let operand =
1554                rewrite_expr_with_maps(operand, group_key_map, aggregate_map, output_names)?;
1555            Ok(TypedExpr {
1556                kind: TypedExprKind::UnaryOp {
1557                    op: *op,
1558                    operand: Box::new(operand),
1559                },
1560                resolved_type: expr.resolved_type.clone(),
1561                span: expr.span,
1562            })
1563        }
1564        TypedExprKind::Between {
1565            expr: inner,
1566            low,
1567            high,
1568            negated,
1569        } => {
1570            let inner = rewrite_expr_with_maps(inner, group_key_map, aggregate_map, output_names)?;
1571            let low = rewrite_expr_with_maps(low, group_key_map, aggregate_map, output_names)?;
1572            let high = rewrite_expr_with_maps(high, group_key_map, aggregate_map, output_names)?;
1573            Ok(TypedExpr {
1574                kind: TypedExprKind::Between {
1575                    expr: Box::new(inner),
1576                    low: Box::new(low),
1577                    high: Box::new(high),
1578                    negated: *negated,
1579                },
1580                resolved_type: expr.resolved_type.clone(),
1581                span: expr.span,
1582            })
1583        }
1584        TypedExprKind::Like {
1585            expr: inner,
1586            pattern,
1587            escape,
1588            negated,
1589        } => {
1590            let inner = rewrite_expr_with_maps(inner, group_key_map, aggregate_map, output_names)?;
1591            let pattern =
1592                rewrite_expr_with_maps(pattern, group_key_map, aggregate_map, output_names)?;
1593            let escape = if let Some(esc) = escape {
1594                Some(Box::new(rewrite_expr_with_maps(
1595                    esc,
1596                    group_key_map,
1597                    aggregate_map,
1598                    output_names,
1599                )?))
1600            } else {
1601                None
1602            };
1603            Ok(TypedExpr {
1604                kind: TypedExprKind::Like {
1605                    expr: Box::new(inner),
1606                    pattern: Box::new(pattern),
1607                    escape,
1608                    negated: *negated,
1609                },
1610                resolved_type: expr.resolved_type.clone(),
1611                span: expr.span,
1612            })
1613        }
1614        TypedExprKind::InList {
1615            expr: inner,
1616            list,
1617            negated,
1618        } => {
1619            let inner = rewrite_expr_with_maps(inner, group_key_map, aggregate_map, output_names)?;
1620            let mut rewritten_list = Vec::with_capacity(list.len());
1621            for item in list {
1622                rewritten_list.push(rewrite_expr_with_maps(
1623                    item,
1624                    group_key_map,
1625                    aggregate_map,
1626                    output_names,
1627                )?);
1628            }
1629            Ok(TypedExpr {
1630                kind: TypedExprKind::InList {
1631                    expr: Box::new(inner),
1632                    list: rewritten_list,
1633                    negated: *negated,
1634                },
1635                resolved_type: expr.resolved_type.clone(),
1636                span: expr.span,
1637            })
1638        }
1639        TypedExprKind::IsNull {
1640            expr: inner,
1641            negated,
1642        } => {
1643            let inner = rewrite_expr_with_maps(inner, group_key_map, aggregate_map, output_names)?;
1644            Ok(TypedExpr {
1645                kind: TypedExprKind::IsNull {
1646                    expr: Box::new(inner),
1647                    negated: *negated,
1648                },
1649                resolved_type: expr.resolved_type.clone(),
1650                span: expr.span,
1651            })
1652        }
1653        TypedExprKind::Literal(_) | TypedExprKind::VectorLiteral(_) => Ok(expr.clone()),
1654        TypedExprKind::ColumnRef { .. } => Err(PlannerError::invalid_expression(
1655            "column reference must appear in GROUP BY or be aggregated".to_string(),
1656        )),
1657        TypedExprKind::Cast {
1658            expr: inner,
1659            target_type,
1660        } => {
1661            let inner = rewrite_expr_with_maps(inner, group_key_map, aggregate_map, output_names)?;
1662            Ok(TypedExpr {
1663                kind: TypedExprKind::Cast {
1664                    expr: Box::new(inner),
1665                    target_type: target_type.clone(),
1666                },
1667                resolved_type: expr.resolved_type.clone(),
1668                span: expr.span,
1669            })
1670        }
1671    }
1672}
1673
1674fn make_output_column_ref(
1675    index: usize,
1676    output_names: &[String],
1677    resolved_type: ResolvedType,
1678    span: crate::ast::Span,
1679) -> TypedExpr {
1680    let name = output_names
1681        .get(index)
1682        .cloned()
1683        .unwrap_or_else(|| format!("col_{index}"));
1684    TypedExpr::column_ref("__agg__".to_string(), name, index, resolved_type, span)
1685}