llkv_sql/
sql_engine.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::{
3    Arc, OnceLock,
4    atomic::{AtomicBool, Ordering as AtomicOrdering},
5};
6
7use crate::SqlResult;
8use crate::SqlValue;
9use arrow::record_batch::RecordBatch;
10
11use llkv_executor::SelectExecution;
12use llkv_expr::literal::Literal;
13use llkv_plan::validation::{
14    ensure_known_columns_case_insensitive, ensure_non_empty, ensure_unique_case_insensitive,
15};
16use llkv_result::Error;
17use llkv_runtime::storage_namespace::TEMPORARY_NAMESPACE_ID;
18use llkv_runtime::{
19    AggregateExpr, AssignmentValue, ColumnAssignment, ColumnSpec, CreateIndexPlan, CreateTablePlan,
20    CreateTableSource, DeletePlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertPlan,
21    InsertSource, OrderByPlan, OrderSortType, OrderTarget, PlanStatement, PlanValue,
22    RuntimeContext, RuntimeEngine, RuntimeSession, RuntimeStatementResult, SelectPlan,
23    SelectProjection, UpdatePlan, extract_rows_from_range,
24};
25use llkv_storage::pager::Pager;
26use llkv_table::catalog::{IdentifierContext, IdentifierResolver};
27use regex::Regex;
28use simd_r_drive_entry_handle::EntryHandle;
29use sqlparser::ast::{
30    Assignment, AssignmentTarget, BeginTransactionKind, BinaryOperator, ColumnOption,
31    ColumnOptionDef, ConstraintCharacteristics, DataType as SqlDataType, Delete, ExceptionWhen,
32    Expr as SqlExpr, FromTable, FunctionArg, FunctionArgExpr, FunctionArguments, GroupByExpr,
33    Ident, LimitClause, NullsDistinctOption, ObjectName, ObjectNamePart, ObjectType, OrderBy,
34    OrderByKind, Query, ReferentialAction, SchemaName, Select, SelectItem,
35    SelectItemQualifiedWildcardKind, Set, SetExpr, SqlOption, Statement, TableConstraint,
36    TableFactor, TableObject, TableWithJoins, TransactionMode, TransactionModifier, UnaryOperator,
37    UpdateTableFromKind, Value, ValueWithSpan,
38};
39use sqlparser::dialect::GenericDialect;
40use sqlparser::parser::Parser;
41
42pub struct SqlEngine<P>
43where
44    P: Pager<Blob = EntryHandle> + Send + Sync,
45{
46    engine: RuntimeEngine<P>,
47    default_nulls_first: AtomicBool,
48}
49
50const DROPPED_TABLE_TRANSACTION_ERR: &str = "another transaction has dropped this table";
51
52impl<P> Clone for SqlEngine<P>
53where
54    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
55{
56    fn clone(&self) -> Self {
57        tracing::warn!(
58            "[SQL_ENGINE] SqlEngine::clone() called - will create new Engine with new session!"
59        );
60        // Create a new session from the same context
61        Self {
62            engine: self.engine.clone(),
63            default_nulls_first: AtomicBool::new(
64                self.default_nulls_first.load(AtomicOrdering::Relaxed),
65            ),
66        }
67    }
68}
69
70#[allow(dead_code)]
71impl<P> SqlEngine<P>
72where
73    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
74{
75    fn map_table_error(table_name: &str, err: Error) -> Error {
76        match err {
77            Error::NotFound => Self::table_not_found_error(table_name),
78            Error::InvalidArgumentError(msg) if msg.contains("unknown table") => {
79                Self::table_not_found_error(table_name)
80            }
81            other => other,
82        }
83    }
84
85    fn table_not_found_error(table_name: &str) -> Error {
86        Error::CatalogError(format!(
87            "Catalog Error: Table '{table_name}' does not exist"
88        ))
89    }
90
91    fn is_table_missing_error(err: &Error) -> bool {
92        match err {
93            Error::NotFound => true,
94            Error::CatalogError(msg) => {
95                msg.contains("Catalog Error: Table") || msg.contains("unknown table")
96            }
97            Error::InvalidArgumentError(msg) => {
98                msg.contains("Catalog Error: Table") || msg.contains("unknown table")
99            }
100            _ => false,
101        }
102    }
103
104    fn execute_plan_statement(
105        &self,
106        statement: PlanStatement,
107    ) -> SqlResult<RuntimeStatementResult<P>> {
108        let table = llkv_runtime::statement_table_name(&statement).map(str::to_string);
109        self.engine.execute_statement(statement).map_err(|err| {
110            if let Some(table_name) = table {
111                Self::map_table_error(&table_name, err)
112            } else {
113                err
114            }
115        })
116    }
117
118    pub fn new(pager: Arc<P>) -> Self {
119        let engine = RuntimeEngine::new(pager);
120        Self {
121            engine,
122            default_nulls_first: AtomicBool::new(false),
123        }
124    }
125
126    /// Preprocess SQL to handle qualified names in EXCLUDE clauses
127    /// Converts EXCLUDE (schema.table.col) to EXCLUDE ("schema.table.col")
128    fn preprocess_exclude_syntax(sql: &str) -> String {
129        static EXCLUDE_REGEX: OnceLock<Regex> = OnceLock::new();
130
131        // Pattern to match EXCLUDE followed by qualified identifiers
132        // Matches: EXCLUDE (identifier.identifier.identifier)
133        let re = EXCLUDE_REGEX.get_or_init(|| {
134            Regex::new(
135                r"(?i)EXCLUDE\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)+)\s*\)",
136            )
137            .expect("valid EXCLUDE qualifier regex")
138        });
139
140        re.replace_all(sql, |caps: &regex::Captures| {
141            let qualified_name = &caps[1];
142            format!("EXCLUDE (\"{}\")", qualified_name)
143        })
144        .to_string()
145    }
146
147    pub(crate) fn context_arc(&self) -> Arc<RuntimeContext<P>> {
148        self.engine.context()
149    }
150
151    pub fn with_context(context: Arc<RuntimeContext<P>>, default_nulls_first: bool) -> Self {
152        Self {
153            engine: RuntimeEngine::from_context(context),
154            default_nulls_first: AtomicBool::new(default_nulls_first),
155        }
156    }
157
158    #[cfg(test)]
159    fn default_nulls_first_for_tests(&self) -> bool {
160        self.default_nulls_first.load(AtomicOrdering::Relaxed)
161    }
162
163    fn has_active_transaction(&self) -> bool {
164        self.engine.session().has_active_transaction()
165    }
166
167    /// Get a reference to the underlying session (for advanced use like error handling in test harnesses).
168    pub fn session(&self) -> &RuntimeSession<P> {
169        self.engine.session()
170    }
171
172    pub fn execute(&self, sql: &str) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
173        tracing::trace!("DEBUG SQL execute: {}", sql);
174
175        // Preprocess SQL to handle qualified names in EXCLUDE
176        // Replace EXCLUDE (schema.table.col) with EXCLUDE ("schema.table.col")
177        let processed_sql = Self::preprocess_exclude_syntax(sql);
178
179        let dialect = GenericDialect {};
180        let statements = Parser::parse_sql(&dialect, &processed_sql)
181            .map_err(|err| Error::InvalidArgumentError(format!("failed to parse SQL: {err}")))?;
182        tracing::trace!("DEBUG SQL execute: parsed {} statements", statements.len());
183
184        let mut results = Vec::with_capacity(statements.len());
185        for (i, statement) in statements.iter().enumerate() {
186            tracing::trace!("DEBUG SQL execute: processing statement {}", i);
187            results.push(self.execute_statement(statement.clone())?);
188            tracing::trace!("DEBUG SQL execute: statement {} completed", i);
189        }
190        tracing::trace!("DEBUG SQL execute completed successfully");
191        Ok(results)
192    }
193
194    fn execute_statement(&self, statement: Statement) -> SqlResult<RuntimeStatementResult<P>> {
195        tracing::trace!(
196            "DEBUG SQL execute_statement: {:?}",
197            match &statement {
198                Statement::Insert(insert) =>
199                    format!("Insert(table={:?})", Self::table_name_from_insert(insert)),
200                Statement::Query(_) => "Query".to_string(),
201                Statement::StartTransaction { .. } => "StartTransaction".to_string(),
202                Statement::Commit { .. } => "Commit".to_string(),
203                Statement::Rollback { .. } => "Rollback".to_string(),
204                Statement::CreateTable(_) => "CreateTable".to_string(),
205                Statement::Update { .. } => "Update".to_string(),
206                Statement::Delete(_) => "Delete".to_string(),
207                other => format!("Other({:?})", other),
208            }
209        );
210        match statement {
211            Statement::StartTransaction {
212                modes,
213                begin,
214                transaction,
215                modifier,
216                statements,
217                exception,
218                has_end_keyword,
219            } => self.handle_start_transaction(
220                modes,
221                begin,
222                transaction,
223                modifier,
224                statements,
225                exception,
226                has_end_keyword,
227            ),
228            Statement::Commit {
229                chain,
230                end,
231                modifier,
232            } => self.handle_commit(chain, end, modifier),
233            Statement::Rollback { chain, savepoint } => self.handle_rollback(chain, savepoint),
234            other => self.execute_statement_non_transactional(other),
235        }
236    }
237
238    fn execute_statement_non_transactional(
239        &self,
240        statement: Statement,
241    ) -> SqlResult<RuntimeStatementResult<P>> {
242        tracing::trace!("DEBUG SQL execute_statement_non_transactional called");
243        match statement {
244            Statement::CreateTable(stmt) => {
245                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateTable");
246                self.handle_create_table(stmt)
247            }
248            Statement::CreateIndex(stmt) => {
249                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateIndex");
250                self.handle_create_index(stmt)
251            }
252            Statement::CreateSchema {
253                schema_name,
254                if_not_exists,
255                with,
256                options,
257                default_collate_spec,
258                clone,
259            } => {
260                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateSchema");
261                self.handle_create_schema(
262                    schema_name,
263                    if_not_exists,
264                    with,
265                    options,
266                    default_collate_spec,
267                    clone,
268                )
269            }
270            Statement::Insert(stmt) => {
271                let table_name =
272                    Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
273                tracing::trace!(
274                    "DEBUG SQL execute_statement_non_transactional: Insert(table={})",
275                    table_name
276                );
277                self.handle_insert(stmt)
278            }
279            Statement::Query(query) => {
280                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Query");
281                self.handle_query(*query)
282            }
283            Statement::Update {
284                table,
285                assignments,
286                from,
287                selection,
288                returning,
289                ..
290            } => {
291                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Update");
292                self.handle_update(table, assignments, from, selection, returning)
293            }
294            Statement::Delete(delete) => {
295                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Delete");
296                self.handle_delete(delete)
297            }
298            Statement::Drop {
299                object_type,
300                if_exists,
301                names,
302                cascade,
303                restrict,
304                purge,
305                temporary,
306                ..
307            } => {
308                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Drop");
309                self.handle_drop(
310                    object_type,
311                    if_exists,
312                    names,
313                    cascade,
314                    restrict,
315                    purge,
316                    temporary,
317                )
318            }
319            Statement::Set(set_stmt) => {
320                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Set");
321                self.handle_set(set_stmt)
322            }
323            Statement::Pragma { name, value, is_eq } => {
324                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Pragma");
325                self.handle_pragma(name, value, is_eq)
326            }
327            other => {
328                tracing::trace!(
329                    "DEBUG SQL execute_statement_non_transactional: Other({:?})",
330                    other
331                );
332                Err(Error::InvalidArgumentError(format!(
333                    "unsupported SQL statement: {other:?}"
334                )))
335            }
336        }
337    }
338
339    fn table_name_from_insert(insert: &sqlparser::ast::Insert) -> SqlResult<String> {
340        match &insert.table {
341            TableObject::TableName(name) => Self::object_name_to_string(name),
342            _ => Err(Error::InvalidArgumentError(
343                "INSERT requires a plain table name".into(),
344            )),
345        }
346    }
347
348    fn table_name_from_update(table: &TableWithJoins) -> SqlResult<Option<String>> {
349        if !table.joins.is_empty() {
350            return Err(Error::InvalidArgumentError(
351                "UPDATE with JOIN targets is not supported yet".into(),
352            ));
353        }
354        Self::table_with_joins_name(table)
355    }
356
357    fn table_name_from_delete(delete: &Delete) -> SqlResult<Option<String>> {
358        if !delete.tables.is_empty() {
359            return Err(Error::InvalidArgumentError(
360                "multi-table DELETE is not supported yet".into(),
361            ));
362        }
363        let from_tables = match &delete.from {
364            FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
365        };
366        if from_tables.is_empty() {
367            return Ok(None);
368        }
369        if from_tables.len() != 1 {
370            return Err(Error::InvalidArgumentError(
371                "DELETE over multiple tables is not supported yet".into(),
372            ));
373        }
374        Self::table_with_joins_name(&from_tables[0])
375    }
376
377    fn object_name_to_string(name: &ObjectName) -> SqlResult<String> {
378        let (display, _) = canonical_object_name(name)?;
379        Ok(display)
380    }
381
382    #[allow(dead_code)]
383    fn table_object_to_name(table: &TableObject) -> SqlResult<Option<String>> {
384        match table {
385            TableObject::TableName(name) => Ok(Some(Self::object_name_to_string(name)?)),
386            TableObject::TableFunction(_) => Ok(None),
387        }
388    }
389
390    fn table_with_joins_name(table: &TableWithJoins) -> SqlResult<Option<String>> {
391        match &table.relation {
392            TableFactor::Table { name, .. } => Ok(Some(Self::object_name_to_string(name)?)),
393            _ => Ok(None),
394        }
395    }
396
397    fn tables_in_query(query: &Query) -> SqlResult<Vec<String>> {
398        let mut tables = Vec::new();
399        if let sqlparser::ast::SetExpr::Select(select) = query.body.as_ref() {
400            for table in &select.from {
401                if let TableFactor::Table { name, .. } = &table.relation {
402                    tables.push(Self::object_name_to_string(name)?);
403                }
404            }
405        }
406        Ok(tables)
407    }
408
409    fn collect_known_columns(
410        &self,
411        display_name: &str,
412        canonical_name: &str,
413    ) -> SqlResult<HashSet<String>> {
414        let context = self.engine.context();
415
416        if context.is_table_marked_dropped(canonical_name) {
417            return Err(Self::table_not_found_error(display_name));
418        }
419
420        match context.table_column_specs(display_name) {
421            Ok(specs) => Ok(specs
422                .into_iter()
423                .map(|spec| spec.name.to_ascii_lowercase())
424                .collect()),
425            Err(err) => {
426                if !Self::is_table_missing_error(&err) {
427                    return Err(Self::map_table_error(display_name, err));
428                }
429
430                Ok(HashSet::new())
431            }
432        }
433    }
434
435    fn is_table_marked_dropped(&self, table_name: &str) -> SqlResult<bool> {
436        let canonical = table_name.to_ascii_lowercase();
437        Ok(self.engine.context().is_table_marked_dropped(&canonical))
438    }
439
440    fn handle_create_table(
441        &self,
442        mut stmt: sqlparser::ast::CreateTable,
443    ) -> SqlResult<RuntimeStatementResult<P>> {
444        validate_create_table_common(&stmt)?;
445
446        let (mut schema_name, table_name) = parse_schema_qualified_name(&stmt.name)?;
447
448        let namespace = if stmt.temporary {
449            if schema_name.is_some() {
450                return Err(Error::InvalidArgumentError(
451                    "temporary tables cannot specify an explicit schema".into(),
452                ));
453            }
454            schema_name = None;
455            Some(TEMPORARY_NAMESPACE_ID.to_string())
456        } else {
457            None
458        };
459
460        // Validate schema exists if specified
461        if let Some(ref schema) = schema_name {
462            let catalog = self.engine.context().table_catalog();
463            if !catalog.schema_exists(schema) {
464                return Err(Error::CatalogError(format!(
465                    "Schema '{}' does not exist",
466                    schema
467                )));
468            }
469        }
470
471        // Use full qualified name (schema.table or just table)
472        let display_name = match &schema_name {
473            Some(schema) => format!("{}.{}", schema, table_name),
474            None => table_name.clone(),
475        };
476        let canonical_name = display_name.to_ascii_lowercase();
477        tracing::trace!(
478            "\n=== HANDLE_CREATE_TABLE: table='{}' columns={} ===",
479            display_name,
480            stmt.columns.len()
481        );
482        if display_name.is_empty() {
483            return Err(Error::InvalidArgumentError(
484                "table name must not be empty".into(),
485            ));
486        }
487
488        if let Some(query) = stmt.query.take() {
489            validate_create_table_as(&stmt)?;
490            if let Some(result) = self.try_handle_range_ctas(
491                &display_name,
492                &canonical_name,
493                &query,
494                stmt.if_not_exists,
495                stmt.or_replace,
496                namespace.clone(),
497            )? {
498                return Ok(result);
499            }
500            return self.handle_create_table_as(
501                display_name,
502                canonical_name,
503                *query,
504                stmt.if_not_exists,
505                stmt.or_replace,
506                namespace.clone(),
507            );
508        }
509
510        if stmt.columns.is_empty() {
511            return Err(Error::InvalidArgumentError(
512                "CREATE TABLE requires at least one column".into(),
513            ));
514        }
515
516        validate_create_table_definition(&stmt)?;
517
518        let column_defs_ast = std::mem::take(&mut stmt.columns);
519        let constraints = std::mem::take(&mut stmt.constraints);
520
521        let column_names: Vec<String> = column_defs_ast
522            .iter()
523            .map(|column_def| column_def.name.value.clone())
524            .collect();
525        ensure_unique_case_insensitive(column_names.iter().map(|name| name.as_str()), |dup| {
526            format!(
527                "duplicate column name '{}' in table '{}'",
528                dup, display_name
529            )
530        })?;
531        let column_names_lower: HashSet<String> = column_names
532            .iter()
533            .map(|name| name.to_ascii_lowercase())
534            .collect();
535
536        let mut columns: Vec<ColumnSpec> = Vec::with_capacity(column_defs_ast.len());
537        let mut primary_key_columns: HashSet<String> = HashSet::new();
538        let mut foreign_keys: Vec<ForeignKeySpec> = Vec::new();
539
540        // Second pass: process columns including CHECK validation and column-level FKs
541        for column_def in column_defs_ast {
542            let is_nullable = column_def
543                .options
544                .iter()
545                .all(|opt| !matches!(opt.option, ColumnOption::NotNull));
546
547            let is_primary_key = column_def.options.iter().any(|opt| {
548                matches!(
549                    opt.option,
550                    ColumnOption::Unique {
551                        is_primary: true,
552                        characteristics: _
553                    }
554                )
555            });
556
557            let has_unique_constraint = column_def
558                .options
559                .iter()
560                .any(|opt| matches!(opt.option, ColumnOption::Unique { .. }));
561
562            // Extract CHECK constraint if present and validate it
563            let check_expr = column_def.options.iter().find_map(|opt| {
564                if let ColumnOption::Check(expr) = &opt.option {
565                    Some(expr)
566                } else {
567                    None
568                }
569            });
570
571            // Validate CHECK constraint if present (now we have all column names)
572            if let Some(check_expr) = check_expr {
573                let all_col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
574                validate_check_constraint(check_expr, &display_name, &all_col_refs)?;
575            }
576
577            let check_expr_str = check_expr.map(|e| e.to_string());
578
579            // Extract column-level FOREIGN KEY (REFERENCES clause)
580            for opt in &column_def.options {
581                if let ColumnOption::ForeignKey {
582                    foreign_table,
583                    referred_columns,
584                    on_delete,
585                    on_update,
586                    characteristics,
587                } = &opt.option
588                {
589                    let spec = self.build_foreign_key_spec(
590                        &display_name,
591                        &canonical_name,
592                        vec![column_def.name.value.clone()],
593                        foreign_table,
594                        referred_columns,
595                        *on_delete,
596                        *on_update,
597                        characteristics,
598                        &column_names_lower,
599                        None,
600                    )?;
601                    foreign_keys.push(spec);
602                }
603            }
604
605            tracing::trace!(
606                "DEBUG CREATE TABLE column '{}' is_primary_key={} has_unique={} check_expr={:?}",
607                column_def.name.value,
608                is_primary_key,
609                has_unique_constraint,
610                check_expr_str
611            );
612
613            let mut column = ColumnSpec::new(
614                column_def.name.value.clone(),
615                arrow_type_from_sql(&column_def.data_type)?,
616                is_nullable,
617            );
618            tracing::trace!(
619                "DEBUG ColumnSpec after new(): primary_key={} unique={}",
620                column.primary_key,
621                column.unique
622            );
623
624            column = column
625                .with_primary_key(is_primary_key)
626                .with_unique(has_unique_constraint)
627                .with_check(check_expr_str);
628
629            if is_primary_key {
630                column.nullable = false;
631                primary_key_columns.insert(column.name.to_ascii_lowercase());
632            }
633            tracing::trace!(
634                "DEBUG ColumnSpec after with_primary_key({})/with_unique({}): primary_key={} unique={} check_expr={:?}",
635                is_primary_key,
636                has_unique_constraint,
637                column.primary_key,
638                column.unique,
639                column.check_expr
640            );
641
642            columns.push(column);
643        }
644
645        // Apply supported table-level constraints (e.g., PRIMARY KEY)
646        if !constraints.is_empty() {
647            let mut column_lookup: HashMap<String, usize> = HashMap::with_capacity(columns.len());
648            for (idx, column) in columns.iter().enumerate() {
649                column_lookup.insert(column.name.to_ascii_lowercase(), idx);
650            }
651
652            for constraint in constraints {
653                match constraint {
654                    TableConstraint::PrimaryKey {
655                        columns: constraint_columns,
656                        ..
657                    } => {
658                        if !primary_key_columns.is_empty() {
659                            return Err(Error::InvalidArgumentError(
660                                "multiple PRIMARY KEY constraints are not supported".into(),
661                            ));
662                        }
663
664                        ensure_non_empty(&constraint_columns, || {
665                            "PRIMARY KEY requires at least one column".into()
666                        })?;
667
668                        let mut pk_column_names: Vec<String> =
669                            Vec::with_capacity(constraint_columns.len());
670
671                        for index_col in &constraint_columns {
672                            let column_ident = extract_index_column_name(
673                                index_col,
674                                "PRIMARY KEY",
675                                false, // no sort options allowed
676                                false, // only simple identifiers
677                            )?;
678                            pk_column_names.push(column_ident);
679                        }
680
681                        ensure_unique_case_insensitive(
682                            pk_column_names.iter().map(|name| name.as_str()),
683                            |dup| format!("duplicate column '{}' in PRIMARY KEY constraint", dup),
684                        )?;
685
686                        ensure_known_columns_case_insensitive(
687                            pk_column_names.iter().map(|name| name.as_str()),
688                            &column_names_lower,
689                            |unknown| {
690                                format!("unknown column '{}' in PRIMARY KEY constraint", unknown)
691                            },
692                        )?;
693
694                        for column_ident in pk_column_names {
695                            let normalized = column_ident.to_ascii_lowercase();
696                            let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
697                                Error::InvalidArgumentError(format!(
698                                    "unknown column '{}' in PRIMARY KEY constraint",
699                                    column_ident
700                                ))
701                            })?;
702
703                            let column = columns.get_mut(idx).expect("column index valid");
704                            column.primary_key = true;
705                            column.unique = true;
706                            column.nullable = false;
707
708                            primary_key_columns.insert(normalized);
709                        }
710                    }
711                    TableConstraint::Unique {
712                        columns: constraint_columns,
713                        index_type,
714                        index_options,
715                        characteristics,
716                        nulls_distinct,
717                        ..
718                    } => {
719                        if !matches!(nulls_distinct, NullsDistinctOption::None) {
720                            return Err(Error::InvalidArgumentError(
721                                "UNIQUE constraints with NULLS DISTINCT/NOT DISTINCT are not supported yet".into(),
722                            ));
723                        }
724
725                        if index_type.is_some() {
726                            return Err(Error::InvalidArgumentError(
727                                "UNIQUE constraints with index types are not supported yet".into(),
728                            ));
729                        }
730
731                        if !index_options.is_empty() {
732                            return Err(Error::InvalidArgumentError(
733                                "UNIQUE constraints with index options are not supported yet"
734                                    .into(),
735                            ));
736                        }
737
738                        if characteristics.is_some() {
739                            return Err(Error::InvalidArgumentError(
740                                "UNIQUE constraint characteristics are not supported yet".into(),
741                            ));
742                        }
743
744                        ensure_non_empty(&constraint_columns, || {
745                            "UNIQUE constraint requires at least one column".into()
746                        })?;
747
748                        let mut unique_column_names: Vec<String> =
749                            Vec::with_capacity(constraint_columns.len());
750
751                        for index_column in &constraint_columns {
752                            let column_ident = extract_index_column_name(
753                                index_column,
754                                "UNIQUE constraint",
755                                false, // no sort options allowed
756                                false, // only simple identifiers
757                            )?;
758                            unique_column_names.push(column_ident);
759                        }
760
761                        if unique_column_names.len() > 1 {
762                            return Err(Error::InvalidArgumentError(
763                                "multi-column UNIQUE constraints are not supported yet".into(),
764                            ));
765                        }
766
767                        ensure_unique_case_insensitive(
768                            unique_column_names.iter().map(|name| name.as_str()),
769                            |dup| format!("duplicate column '{}' in UNIQUE constraint", dup),
770                        )?;
771
772                        ensure_known_columns_case_insensitive(
773                            unique_column_names.iter().map(|name| name.as_str()),
774                            &column_names_lower,
775                            |unknown| format!("unknown column '{}' in UNIQUE constraint", unknown),
776                        )?;
777
778                        let column_ident = unique_column_names
779                            .into_iter()
780                            .next()
781                            .expect("unique constraint checked for emptiness");
782                        let normalized = column_ident.to_ascii_lowercase();
783                        let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
784                            Error::InvalidArgumentError(format!(
785                                "unknown column '{}' in UNIQUE constraint",
786                                column_ident
787                            ))
788                        })?;
789
790                        let column = columns
791                            .get_mut(idx)
792                            .expect("column index from lookup must be valid");
793                        column.unique = true;
794                    }
795                    TableConstraint::ForeignKey {
796                        name,
797                        index_name,
798                        columns: fk_columns,
799                        foreign_table,
800                        referred_columns,
801                        on_delete,
802                        on_update,
803                        characteristics,
804                        ..
805                    } => {
806                        if index_name.is_some() {
807                            return Err(Error::InvalidArgumentError(
808                                "FOREIGN KEY index clauses are not supported yet".into(),
809                            ));
810                        }
811
812                        let referencing_columns: Vec<String> =
813                            fk_columns.into_iter().map(|ident| ident.value).collect();
814                        let spec = self.build_foreign_key_spec(
815                            &display_name,
816                            &canonical_name,
817                            referencing_columns,
818                            &foreign_table,
819                            &referred_columns,
820                            on_delete,
821                            on_update,
822                            &characteristics,
823                            &column_names_lower,
824                            name.map(|ident| ident.value),
825                        )?;
826
827                        foreign_keys.push(spec);
828                    }
829                    unsupported => {
830                        return Err(Error::InvalidArgumentError(format!(
831                            "table-level constraint {:?} is not supported",
832                            unsupported
833                        )));
834                    }
835                }
836            }
837        }
838
839        let plan = CreateTablePlan {
840            name: display_name,
841            if_not_exists: stmt.if_not_exists,
842            or_replace: stmt.or_replace,
843            columns,
844            source: None,
845            namespace,
846            foreign_keys,
847        };
848        self.execute_plan_statement(PlanStatement::CreateTable(plan))
849    }
850
851    fn handle_create_index(
852        &self,
853        stmt: sqlparser::ast::CreateIndex,
854    ) -> SqlResult<RuntimeStatementResult<P>> {
855        let sqlparser::ast::CreateIndex {
856            name,
857            table_name,
858            using,
859            columns,
860            unique,
861            concurrently,
862            if_not_exists,
863            include,
864            nulls_distinct,
865            with,
866            predicate,
867            index_options,
868            alter_options,
869            ..
870        } = stmt;
871
872        if concurrently {
873            return Err(Error::InvalidArgumentError(
874                "CREATE INDEX CONCURRENTLY is not supported".into(),
875            ));
876        }
877        if using.is_some() {
878            return Err(Error::InvalidArgumentError(
879                "CREATE INDEX USING clauses are not supported".into(),
880            ));
881        }
882        if !include.is_empty() {
883            return Err(Error::InvalidArgumentError(
884                "CREATE INDEX INCLUDE columns are not supported".into(),
885            ));
886        }
887        if nulls_distinct.is_some() {
888            return Err(Error::InvalidArgumentError(
889                "CREATE INDEX NULLS DISTINCT is not supported".into(),
890            ));
891        }
892        if !with.is_empty() {
893            return Err(Error::InvalidArgumentError(
894                "CREATE INDEX WITH options are not supported".into(),
895            ));
896        }
897        if predicate.is_some() {
898            return Err(Error::InvalidArgumentError(
899                "partial CREATE INDEX is not supported".into(),
900            ));
901        }
902        if !index_options.is_empty() {
903            return Err(Error::InvalidArgumentError(
904                "CREATE INDEX options are not supported".into(),
905            ));
906        }
907        if !alter_options.is_empty() {
908            return Err(Error::InvalidArgumentError(
909                "CREATE INDEX ALTER options are not supported".into(),
910            ));
911        }
912        if columns.is_empty() {
913            return Err(Error::InvalidArgumentError(
914                "CREATE INDEX requires at least one column".into(),
915            ));
916        }
917
918        let (schema_name, base_table_name) = parse_schema_qualified_name(&table_name)?;
919        if let Some(ref schema) = schema_name {
920            let catalog = self.engine.context().table_catalog();
921            if !catalog.schema_exists(schema) {
922                return Err(Error::CatalogError(format!(
923                    "Schema '{}' does not exist",
924                    schema
925                )));
926            }
927        }
928
929        let display_table_name = schema_name
930            .as_ref()
931            .map(|schema| format!("{}.{}", schema, base_table_name))
932            .unwrap_or_else(|| base_table_name.clone());
933        let canonical_table_name = display_table_name.to_ascii_lowercase();
934
935        let known_columns =
936            self.collect_known_columns(&display_table_name, &canonical_table_name)?;
937        let enforce_known_columns = !known_columns.is_empty();
938
939        let index_name = match name {
940            Some(name_obj) => Some(Self::object_name_to_string(&name_obj)?),
941            None => None,
942        };
943
944        let mut index_columns: Vec<IndexColumnPlan> = Vec::with_capacity(columns.len());
945        let mut seen_column_names: HashSet<String> = HashSet::new();
946        for item in columns {
947            // Check WITH FILL before calling helper (not part of standard validation)
948            if item.column.with_fill.is_some() {
949                return Err(Error::InvalidArgumentError(
950                    "CREATE INDEX column WITH FILL is not supported".into(),
951                ));
952            }
953
954            let column_name = extract_index_column_name(
955                &item,
956                "CREATE INDEX",
957                true, // allow and validate sort options
958                true, // allow compound identifiers
959            )?;
960
961            // Get sort options (already validated by helper)
962            let order_expr = &item.column;
963            let ascending = order_expr.options.asc.unwrap_or(true);
964            let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
965
966            let normalized = column_name.to_ascii_lowercase();
967            if !seen_column_names.insert(normalized.clone()) {
968                return Err(Error::InvalidArgumentError(format!(
969                    "duplicate column '{}' in CREATE INDEX",
970                    column_name
971                )));
972            }
973
974            if enforce_known_columns && !known_columns.contains(&normalized) {
975                return Err(Error::InvalidArgumentError(format!(
976                    "column '{}' does not exist in table '{}'",
977                    column_name, display_table_name
978                )));
979            }
980
981            let column_plan = IndexColumnPlan::new(column_name).with_sort(ascending, nulls_first);
982            index_columns.push(column_plan);
983        }
984
985        if index_columns.len() > 1 && !unique {
986            return Err(Error::InvalidArgumentError(
987                "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
988            ));
989        }
990
991        let plan = CreateIndexPlan::new(display_table_name)
992            .with_name(index_name)
993            .with_unique(unique)
994            .with_if_not_exists(if_not_exists)
995            .with_columns(index_columns);
996
997        self.execute_plan_statement(PlanStatement::CreateIndex(plan))
998    }
999
1000    fn map_referential_action(
1001        action: Option<ReferentialAction>,
1002        kind: &str,
1003    ) -> SqlResult<ForeignKeyAction> {
1004        match action {
1005            None | Some(ReferentialAction::NoAction) => Ok(ForeignKeyAction::NoAction),
1006            Some(ReferentialAction::Restrict) => Ok(ForeignKeyAction::Restrict),
1007            Some(other) => Err(Error::InvalidArgumentError(format!(
1008                "FOREIGN KEY ON {kind} {:?} is not supported yet",
1009                other
1010            ))),
1011        }
1012    }
1013
1014    #[allow(clippy::too_many_arguments)]
1015    fn build_foreign_key_spec(
1016        &self,
1017        _referencing_display: &str,
1018        referencing_canonical: &str,
1019        referencing_columns: Vec<String>,
1020        foreign_table: &ObjectName,
1021        referenced_columns: &[Ident],
1022        on_delete: Option<ReferentialAction>,
1023        on_update: Option<ReferentialAction>,
1024        characteristics: &Option<ConstraintCharacteristics>,
1025        known_columns_lower: &HashSet<String>,
1026        name: Option<String>,
1027    ) -> SqlResult<ForeignKeySpec> {
1028        if characteristics.is_some() {
1029            return Err(Error::InvalidArgumentError(
1030                "FOREIGN KEY constraint characteristics are not supported yet".into(),
1031            ));
1032        }
1033
1034        ensure_non_empty(&referencing_columns, || {
1035            "FOREIGN KEY constraint requires at least one referencing column".into()
1036        })?;
1037        ensure_unique_case_insensitive(
1038            referencing_columns.iter().map(|name| name.as_str()),
1039            |dup| format!("duplicate column '{}' in FOREIGN KEY constraint", dup),
1040        )?;
1041        ensure_known_columns_case_insensitive(
1042            referencing_columns.iter().map(|name| name.as_str()),
1043            known_columns_lower,
1044            |unknown| format!("unknown column '{}' in FOREIGN KEY constraint", unknown),
1045        )?;
1046
1047        let referenced_columns_vec: Vec<String> = referenced_columns
1048            .iter()
1049            .map(|ident| ident.value.clone())
1050            .collect();
1051        ensure_unique_case_insensitive(
1052            referenced_columns_vec.iter().map(|name| name.as_str()),
1053            |dup| {
1054                format!(
1055                    "duplicate referenced column '{}' in FOREIGN KEY constraint",
1056                    dup
1057                )
1058            },
1059        )?;
1060
1061        if !referenced_columns_vec.is_empty()
1062            && referenced_columns_vec.len() != referencing_columns.len()
1063        {
1064            return Err(Error::InvalidArgumentError(
1065                "FOREIGN KEY referencing and referenced column counts must match".into(),
1066            ));
1067        }
1068
1069        let (referenced_display, referenced_canonical) = canonical_object_name(foreign_table)?;
1070
1071        if referenced_canonical == referencing_canonical {
1072            ensure_known_columns_case_insensitive(
1073                referenced_columns_vec.iter().map(|name| name.as_str()),
1074                known_columns_lower,
1075                |unknown| {
1076                    format!(
1077                        "unknown referenced column '{}' in FOREIGN KEY constraint",
1078                        unknown
1079                    )
1080                },
1081            )?;
1082        } else {
1083            let known_columns =
1084                self.collect_known_columns(&referenced_display, &referenced_canonical)?;
1085            if !known_columns.is_empty() {
1086                ensure_known_columns_case_insensitive(
1087                    referenced_columns_vec.iter().map(|name| name.as_str()),
1088                    &known_columns,
1089                    |unknown| {
1090                        format!(
1091                            "unknown referenced column '{}' in FOREIGN KEY constraint",
1092                            unknown
1093                        )
1094                    },
1095                )?;
1096            }
1097        }
1098
1099        let on_delete_action = Self::map_referential_action(on_delete, "DELETE")?;
1100        let on_update_action = Self::map_referential_action(on_update, "UPDATE")?;
1101
1102        Ok(ForeignKeySpec {
1103            name,
1104            columns: referencing_columns,
1105            referenced_table: referenced_display,
1106            referenced_columns: referenced_columns_vec,
1107            on_delete: on_delete_action,
1108            on_update: on_update_action,
1109        })
1110    }
1111
1112    fn handle_create_schema(
1113        &self,
1114        schema_name: SchemaName,
1115        _if_not_exists: bool,
1116        with: Option<Vec<SqlOption>>,
1117        options: Option<Vec<SqlOption>>,
1118        default_collate_spec: Option<SqlExpr>,
1119        clone: Option<ObjectName>,
1120    ) -> SqlResult<RuntimeStatementResult<P>> {
1121        if clone.is_some() {
1122            return Err(Error::InvalidArgumentError(
1123                "CREATE SCHEMA ... CLONE is not supported".into(),
1124            ));
1125        }
1126        if with.as_ref().is_some_and(|opts| !opts.is_empty()) {
1127            return Err(Error::InvalidArgumentError(
1128                "CREATE SCHEMA ... WITH options are not supported".into(),
1129            ));
1130        }
1131        if options.as_ref().is_some_and(|opts| !opts.is_empty()) {
1132            return Err(Error::InvalidArgumentError(
1133                "CREATE SCHEMA options are not supported".into(),
1134            ));
1135        }
1136        if default_collate_spec.is_some() {
1137            return Err(Error::InvalidArgumentError(
1138                "CREATE SCHEMA DEFAULT COLLATE is not supported".into(),
1139            ));
1140        }
1141
1142        let schema_name = match schema_name {
1143            SchemaName::Simple(name) => name,
1144            _ => {
1145                return Err(Error::InvalidArgumentError(
1146                    "CREATE SCHEMA authorization is not supported".into(),
1147                ));
1148            }
1149        };
1150
1151        let (display_name, canonical) = canonical_object_name(&schema_name)?;
1152        if display_name.is_empty() {
1153            return Err(Error::InvalidArgumentError(
1154                "schema name must not be empty".into(),
1155            ));
1156        }
1157
1158        // Register schema in the catalog
1159        let catalog = self.engine.context().table_catalog();
1160
1161        if _if_not_exists && catalog.schema_exists(&canonical) {
1162            return Ok(RuntimeStatementResult::NoOp);
1163        }
1164
1165        catalog.register_schema(&canonical).map_err(|err| {
1166            Error::CatalogError(format!(
1167                "Failed to create schema '{}': {}",
1168                display_name, err
1169            ))
1170        })?;
1171
1172        Ok(RuntimeStatementResult::NoOp)
1173    }
1174
1175    fn try_handle_range_ctas(
1176        &self,
1177        display_name: &str,
1178        _canonical_name: &str,
1179        query: &Query,
1180        if_not_exists: bool,
1181        or_replace: bool,
1182        namespace: Option<String>,
1183    ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1184        let select = match query.body.as_ref() {
1185            SetExpr::Select(select) => select,
1186            _ => return Ok(None),
1187        };
1188        if select.from.len() != 1 {
1189            return Ok(None);
1190        }
1191        let table_with_joins = &select.from[0];
1192        if !table_with_joins.joins.is_empty() {
1193            return Ok(None);
1194        }
1195        let (range_size, range_alias) = match &table_with_joins.relation {
1196            TableFactor::Table {
1197                name,
1198                args: Some(args),
1199                alias,
1200                ..
1201            } => {
1202                let func_name = name.to_string().to_ascii_lowercase();
1203                if func_name != "range" {
1204                    return Ok(None);
1205                }
1206                if args.args.len() != 1 {
1207                    return Err(Error::InvalidArgumentError(
1208                        "range table function expects a single argument".into(),
1209                    ));
1210                }
1211                let size_expr = &args.args[0];
1212                let range_size = match size_expr {
1213                    FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1214                        match &value.value {
1215                            Value::Number(raw, _) => raw.parse::<i64>().map_err(|e| {
1216                                Error::InvalidArgumentError(format!(
1217                                    "invalid range size literal {}: {}",
1218                                    raw, e
1219                                ))
1220                            })?,
1221                            other => {
1222                                return Err(Error::InvalidArgumentError(format!(
1223                                    "unsupported range size value: {:?}",
1224                                    other
1225                                )));
1226                            }
1227                        }
1228                    }
1229                    _ => {
1230                        return Err(Error::InvalidArgumentError(
1231                            "unsupported range argument".into(),
1232                        ));
1233                    }
1234                };
1235                (range_size, alias.as_ref().map(|a| a.name.value.clone()))
1236            }
1237            _ => return Ok(None),
1238        };
1239
1240        if range_size < 0 {
1241            return Err(Error::InvalidArgumentError(
1242                "range size must be non-negative".into(),
1243            ));
1244        }
1245
1246        if select.projection.is_empty() {
1247            return Err(Error::InvalidArgumentError(
1248                "CREATE TABLE AS SELECT requires at least one projected column".into(),
1249            ));
1250        }
1251
1252        let mut column_specs = Vec::with_capacity(select.projection.len());
1253        let mut column_names = Vec::with_capacity(select.projection.len());
1254        let mut row_template = Vec::with_capacity(select.projection.len());
1255        for item in &select.projection {
1256            match item {
1257                SelectItem::ExprWithAlias { expr, alias } => {
1258                    let (value, data_type) = match expr {
1259                        SqlExpr::Value(value_with_span) => match &value_with_span.value {
1260                            Value::Number(raw, _) => {
1261                                let parsed = raw.parse::<i64>().map_err(|e| {
1262                                    Error::InvalidArgumentError(format!(
1263                                        "invalid numeric literal {}: {}",
1264                                        raw, e
1265                                    ))
1266                                })?;
1267                                (
1268                                    PlanValue::Integer(parsed),
1269                                    arrow::datatypes::DataType::Int64,
1270                                )
1271                            }
1272                            Value::SingleQuotedString(s) => (
1273                                PlanValue::String(s.clone()),
1274                                arrow::datatypes::DataType::Utf8,
1275                            ),
1276                            other => {
1277                                return Err(Error::InvalidArgumentError(format!(
1278                                    "unsupported SELECT expression in range CTAS: {:?}",
1279                                    other
1280                                )));
1281                            }
1282                        },
1283                        SqlExpr::Identifier(ident) => {
1284                            let ident_lower = ident.value.to_ascii_lowercase();
1285                            if range_alias
1286                                .as_ref()
1287                                .map(|a| a.eq_ignore_ascii_case(&ident_lower))
1288                                .unwrap_or(false)
1289                                || ident_lower == "range"
1290                            {
1291                                return Err(Error::InvalidArgumentError(
1292                                    "range() table function columns are not supported yet".into(),
1293                                ));
1294                            }
1295                            return Err(Error::InvalidArgumentError(format!(
1296                                "unsupported identifier '{}' in range CTAS projection",
1297                                ident.value
1298                            )));
1299                        }
1300                        other => {
1301                            return Err(Error::InvalidArgumentError(format!(
1302                                "unsupported SELECT expression in range CTAS: {:?}",
1303                                other
1304                            )));
1305                        }
1306                    };
1307                    let column_name = alias.value.clone();
1308                    column_specs.push(ColumnSpec::new(column_name.clone(), data_type, true));
1309                    column_names.push(column_name);
1310                    row_template.push(value);
1311                }
1312                other => {
1313                    return Err(Error::InvalidArgumentError(format!(
1314                        "unsupported projection {:?} in range CTAS",
1315                        other
1316                    )));
1317                }
1318            }
1319        }
1320
1321        let plan = CreateTablePlan {
1322            name: display_name.to_string(),
1323            if_not_exists,
1324            or_replace,
1325            columns: column_specs,
1326            source: None,
1327            namespace,
1328            foreign_keys: Vec::new(),
1329        };
1330        let create_result = self.execute_plan_statement(PlanStatement::CreateTable(plan))?;
1331
1332        let row_count = range_size
1333            .try_into()
1334            .map_err(|_| Error::InvalidArgumentError("range size exceeds usize".into()))?;
1335        if row_count > 0 {
1336            let rows = vec![row_template; row_count];
1337            let insert_plan = InsertPlan {
1338                table: display_name.to_string(),
1339                columns: column_names,
1340                source: InsertSource::Rows(rows),
1341            };
1342            self.execute_plan_statement(PlanStatement::Insert(insert_plan))?;
1343        }
1344
1345        Ok(Some(create_result))
1346    }
1347
1348    // TODO: Refactor into runtime or executor layer?
1349    /// Try to handle pragma_table_info('table_name') table function
1350    fn try_handle_pragma_table_info(
1351        &self,
1352        query: &Query,
1353    ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1354        let select = match query.body.as_ref() {
1355            SetExpr::Select(select) => select,
1356            _ => return Ok(None),
1357        };
1358
1359        if select.from.len() != 1 {
1360            return Ok(None);
1361        }
1362
1363        let table_with_joins = &select.from[0];
1364        if !table_with_joins.joins.is_empty() {
1365            return Ok(None);
1366        }
1367
1368        // Check if this is pragma_table_info function call
1369        let table_name = match &table_with_joins.relation {
1370            TableFactor::Table {
1371                name,
1372                args: Some(args),
1373                ..
1374            } => {
1375                let func_name = name.to_string().to_ascii_lowercase();
1376                if func_name != "pragma_table_info" {
1377                    return Ok(None);
1378                }
1379
1380                // Extract table name from argument
1381                if args.args.len() != 1 {
1382                    return Err(Error::InvalidArgumentError(
1383                        "pragma_table_info expects exactly one argument".into(),
1384                    ));
1385                }
1386
1387                match &args.args[0] {
1388                    FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1389                        match &value.value {
1390                            Value::SingleQuotedString(s) => s.clone(),
1391                            Value::DoubleQuotedString(s) => s.clone(),
1392                            _ => {
1393                                return Err(Error::InvalidArgumentError(
1394                                    "pragma_table_info argument must be a string".into(),
1395                                ));
1396                            }
1397                        }
1398                    }
1399                    _ => {
1400                        return Err(Error::InvalidArgumentError(
1401                            "pragma_table_info argument must be a string literal".into(),
1402                        ));
1403                    }
1404                }
1405            }
1406            _ => return Ok(None),
1407        };
1408
1409        // Get table column specs from runtime context
1410        let context = self.engine.context();
1411        let columns = context.table_column_specs(&table_name)?;
1412
1413        // Build RecordBatch with table column information
1414        use arrow::array::{BooleanArray, Int32Array, StringArray};
1415        use arrow::datatypes::{DataType, Field, Schema};
1416
1417        let mut cid_values = Vec::new();
1418        let mut name_values = Vec::new();
1419        let mut type_values = Vec::new();
1420        let mut notnull_values = Vec::new();
1421        let mut dflt_value_values: Vec<Option<String>> = Vec::new();
1422        let mut pk_values = Vec::new();
1423
1424        for (idx, col) in columns.iter().enumerate() {
1425            cid_values.push(idx as i32);
1426            name_values.push(col.name.clone());
1427            type_values.push(format!("{:?}", col.data_type)); // Simple type representation
1428            notnull_values.push(!col.nullable);
1429            dflt_value_values.push(None); // We don't track default values yet
1430            pk_values.push(col.primary_key);
1431        }
1432
1433        let schema = Arc::new(Schema::new(vec![
1434            Field::new("cid", DataType::Int32, false),
1435            Field::new("name", DataType::Utf8, false),
1436            Field::new("type", DataType::Utf8, false),
1437            Field::new("notnull", DataType::Boolean, false),
1438            Field::new("dflt_value", DataType::Utf8, true),
1439            Field::new("pk", DataType::Boolean, false),
1440        ]));
1441
1442        use arrow::array::ArrayRef;
1443        let mut batch = RecordBatch::try_new(
1444            Arc::clone(&schema),
1445            vec![
1446                Arc::new(Int32Array::from(cid_values)) as ArrayRef,
1447                Arc::new(StringArray::from(name_values)) as ArrayRef,
1448                Arc::new(StringArray::from(type_values)) as ArrayRef,
1449                Arc::new(BooleanArray::from(notnull_values)) as ArrayRef,
1450                Arc::new(StringArray::from(dflt_value_values)) as ArrayRef,
1451                Arc::new(BooleanArray::from(pk_values)) as ArrayRef,
1452            ],
1453        )
1454        .map_err(|e| Error::Internal(format!("failed to create pragma_table_info batch: {}", e)))?;
1455
1456        // Apply SELECT projections: extract only requested columns
1457        let projection_indices: Vec<usize> = select
1458            .projection
1459            .iter()
1460            .filter_map(|item| {
1461                match item {
1462                    SelectItem::UnnamedExpr(SqlExpr::Identifier(ident)) => {
1463                        schema.index_of(&ident.value).ok()
1464                    }
1465                    SelectItem::ExprWithAlias { expr, .. } => {
1466                        if let SqlExpr::Identifier(ident) = expr {
1467                            schema.index_of(&ident.value).ok()
1468                        } else {
1469                            None
1470                        }
1471                    }
1472                    SelectItem::Wildcard(_) => None, // Handle * separately
1473                    _ => None,
1474                }
1475            })
1476            .collect();
1477
1478        // Apply projections if not SELECT *
1479        let projected_schema;
1480        if !projection_indices.is_empty() {
1481            let projected_fields: Vec<Field> = projection_indices
1482                .iter()
1483                .map(|&idx| schema.field(idx).clone())
1484                .collect();
1485            projected_schema = Arc::new(Schema::new(projected_fields));
1486
1487            let projected_columns: Vec<ArrayRef> = projection_indices
1488                .iter()
1489                .map(|&idx| Arc::clone(batch.column(idx)))
1490                .collect();
1491
1492            batch = RecordBatch::try_new(Arc::clone(&projected_schema), projected_columns)
1493                .map_err(|e| Error::Internal(format!("failed to project columns: {}", e)))?;
1494        } else {
1495            // SELECT * or complex projections - use original schema
1496            projected_schema = schema;
1497        }
1498
1499        // Apply ORDER BY using Arrow compute kernels
1500        if let Some(order_by) = &query.order_by {
1501            use arrow::compute::SortColumn;
1502            use arrow::compute::lexsort_to_indices;
1503            use sqlparser::ast::OrderByKind;
1504
1505            let exprs = match &order_by.kind {
1506                OrderByKind::Expressions(exprs) => exprs,
1507                _ => {
1508                    return Err(Error::InvalidArgumentError(
1509                        "unsupported ORDER BY clause".into(),
1510                    ));
1511                }
1512            };
1513
1514            let mut sort_columns = Vec::new();
1515            for order_expr in exprs {
1516                if let SqlExpr::Identifier(ident) = &order_expr.expr
1517                    && let Ok(col_idx) = projected_schema.index_of(&ident.value)
1518                {
1519                    let options = arrow::compute::SortOptions {
1520                        descending: !order_expr.options.asc.unwrap_or(true),
1521                        nulls_first: order_expr.options.nulls_first.unwrap_or(false),
1522                    };
1523                    sort_columns.push(SortColumn {
1524                        values: Arc::clone(batch.column(col_idx)),
1525                        options: Some(options),
1526                    });
1527                }
1528            }
1529
1530            if !sort_columns.is_empty() {
1531                let indices = lexsort_to_indices(&sort_columns, None)
1532                    .map_err(|e| Error::Internal(format!("failed to sort: {}", e)))?;
1533
1534                use arrow::compute::take;
1535                let sorted_columns: Result<Vec<ArrayRef>, _> = batch
1536                    .columns()
1537                    .iter()
1538                    .map(|col| take(col.as_ref(), &indices, None))
1539                    .collect();
1540
1541                batch = RecordBatch::try_new(
1542                    Arc::clone(&projected_schema),
1543                    sorted_columns
1544                        .map_err(|e| Error::Internal(format!("failed to apply sort: {}", e)))?,
1545                )
1546                .map_err(|e| Error::Internal(format!("failed to create sorted batch: {}", e)))?;
1547            }
1548        }
1549
1550        let execution = SelectExecution::new_single_batch(
1551            table_name.clone(),
1552            Arc::clone(&projected_schema),
1553            batch,
1554        );
1555
1556        Ok(Some(RuntimeStatementResult::Select {
1557            table_name,
1558            schema: projected_schema,
1559            execution,
1560        }))
1561    }
1562
1563    fn handle_create_table_as(
1564        &self,
1565        display_name: String,
1566        _canonical_name: String,
1567        query: Query,
1568        if_not_exists: bool,
1569        or_replace: bool,
1570        namespace: Option<String>,
1571    ) -> SqlResult<RuntimeStatementResult<P>> {
1572        let select_plan = self.build_select_plan(query)?;
1573
1574        if select_plan.projections.is_empty() && select_plan.aggregates.is_empty() {
1575            return Err(Error::InvalidArgumentError(
1576                "CREATE TABLE AS SELECT requires at least one projected column".into(),
1577            ));
1578        }
1579
1580        let plan = CreateTablePlan {
1581            name: display_name,
1582            if_not_exists,
1583            or_replace,
1584            columns: Vec::new(),
1585            source: Some(CreateTableSource::Select {
1586                plan: Box::new(select_plan),
1587            }),
1588            namespace,
1589            foreign_keys: Vec::new(),
1590        };
1591        self.execute_plan_statement(PlanStatement::CreateTable(plan))
1592    }
1593
1594    fn handle_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<RuntimeStatementResult<P>> {
1595        let table_name_debug =
1596            Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
1597        tracing::trace!(
1598            "DEBUG SQL handle_insert called for table={}",
1599            table_name_debug
1600        );
1601        if !self.engine.session().has_active_transaction()
1602            && self.is_table_marked_dropped(&table_name_debug)?
1603        {
1604            return Err(Error::TransactionContextError(
1605                DROPPED_TABLE_TRANSACTION_ERR.into(),
1606            ));
1607        }
1608        if stmt.replace_into || stmt.ignore || stmt.or.is_some() {
1609            return Err(Error::InvalidArgumentError(
1610                "non-standard INSERT forms are not supported".into(),
1611            ));
1612        }
1613        if stmt.overwrite {
1614            return Err(Error::InvalidArgumentError(
1615                "INSERT OVERWRITE is not supported".into(),
1616            ));
1617        }
1618        if !stmt.assignments.is_empty() {
1619            return Err(Error::InvalidArgumentError(
1620                "INSERT ... SET is not supported".into(),
1621            ));
1622        }
1623        if stmt.partitioned.is_some() || !stmt.after_columns.is_empty() {
1624            return Err(Error::InvalidArgumentError(
1625                "partitioned INSERT is not supported".into(),
1626            ));
1627        }
1628        if stmt.returning.is_some() {
1629            return Err(Error::InvalidArgumentError(
1630                "INSERT ... RETURNING is not supported".into(),
1631            ));
1632        }
1633        if stmt.format_clause.is_some() || stmt.settings.is_some() {
1634            return Err(Error::InvalidArgumentError(
1635                "INSERT with FORMAT or SETTINGS is not supported".into(),
1636            ));
1637        }
1638
1639        let (display_name, _canonical_name) = match &stmt.table {
1640            TableObject::TableName(name) => canonical_object_name(name)?,
1641            _ => {
1642                return Err(Error::InvalidArgumentError(
1643                    "INSERT requires a plain table name".into(),
1644                ));
1645            }
1646        };
1647
1648        let columns: Vec<String> = stmt
1649            .columns
1650            .iter()
1651            .map(|ident| ident.value.clone())
1652            .collect();
1653        let source_expr = stmt
1654            .source
1655            .as_ref()
1656            .ok_or_else(|| Error::InvalidArgumentError("INSERT requires a VALUES clause".into()))?;
1657        validate_simple_query(source_expr)?;
1658
1659        let insert_source = match source_expr.body.as_ref() {
1660            SetExpr::Values(values) => {
1661                if values.rows.is_empty() {
1662                    return Err(Error::InvalidArgumentError(
1663                        "INSERT VALUES list must contain at least one row".into(),
1664                    ));
1665                }
1666                let mut rows: Vec<Vec<SqlValue>> = Vec::with_capacity(values.rows.len());
1667                for row in &values.rows {
1668                    let mut converted = Vec::with_capacity(row.len());
1669                    for expr in row {
1670                        converted.push(SqlValue::try_from_expr(expr)?);
1671                    }
1672                    rows.push(converted);
1673                }
1674                InsertSource::Rows(
1675                    rows.into_iter()
1676                        .map(|row| row.into_iter().map(PlanValue::from).collect())
1677                        .collect(),
1678                )
1679            }
1680            SetExpr::Select(select) => {
1681                if let Some(rows) = extract_constant_select_rows(select.as_ref())? {
1682                    InsertSource::Rows(rows)
1683                } else if let Some(range_rows) = extract_rows_from_range(select.as_ref())? {
1684                    InsertSource::Rows(range_rows.into_rows())
1685                } else {
1686                    let select_plan = self.build_select_plan((**source_expr).clone())?;
1687                    InsertSource::Select {
1688                        plan: Box::new(select_plan),
1689                    }
1690                }
1691            }
1692            _ => {
1693                return Err(Error::InvalidArgumentError(
1694                    "unsupported INSERT source".into(),
1695                ));
1696            }
1697        };
1698
1699        let plan = InsertPlan {
1700            table: display_name.clone(),
1701            columns,
1702            source: insert_source,
1703        };
1704        tracing::trace!(
1705            "DEBUG SQL handle_insert: about to execute insert for table={}",
1706            display_name
1707        );
1708        self.execute_plan_statement(PlanStatement::Insert(plan))
1709    }
1710
1711    fn handle_update(
1712        &self,
1713        table: TableWithJoins,
1714        assignments: Vec<Assignment>,
1715        from: Option<UpdateTableFromKind>,
1716        selection: Option<SqlExpr>,
1717        returning: Option<Vec<SelectItem>>,
1718    ) -> SqlResult<RuntimeStatementResult<P>> {
1719        if from.is_some() {
1720            return Err(Error::InvalidArgumentError(
1721                "UPDATE ... FROM is not supported yet".into(),
1722            ));
1723        }
1724        if returning.is_some() {
1725            return Err(Error::InvalidArgumentError(
1726                "UPDATE ... RETURNING is not supported".into(),
1727            ));
1728        }
1729        if assignments.is_empty() {
1730            return Err(Error::InvalidArgumentError(
1731                "UPDATE requires at least one assignment".into(),
1732            ));
1733        }
1734
1735        let (display_name, canonical_name) = extract_single_table(std::slice::from_ref(&table))?;
1736
1737        if !self.engine.session().has_active_transaction()
1738            && self
1739                .engine
1740                .context()
1741                .is_table_marked_dropped(&canonical_name)
1742        {
1743            return Err(Error::TransactionContextError(
1744                DROPPED_TABLE_TRANSACTION_ERR.into(),
1745            ));
1746        }
1747
1748        let catalog = self.engine.context().table_catalog();
1749        let resolver = catalog.identifier_resolver();
1750        let table_id = catalog.table_id(&canonical_name);
1751
1752        let mut column_assignments = Vec::with_capacity(assignments.len());
1753        let mut seen: HashMap<String, ()> = HashMap::new();
1754        for assignment in assignments {
1755            let column_name = resolve_assignment_column_name(&assignment.target)?;
1756            let normalized = column_name.to_ascii_lowercase();
1757            if seen.insert(normalized, ()).is_some() {
1758                return Err(Error::InvalidArgumentError(format!(
1759                    "duplicate column '{}' in UPDATE assignments",
1760                    column_name
1761                )));
1762            }
1763            let value = match SqlValue::try_from_expr(&assignment.value) {
1764                Ok(literal) => AssignmentValue::Literal(PlanValue::from(literal)),
1765                Err(Error::InvalidArgumentError(msg))
1766                    if msg.contains("unsupported literal expression") =>
1767                {
1768                    let translated = translate_scalar_with_context(
1769                        &resolver,
1770                        IdentifierContext::new(table_id),
1771                        &assignment.value,
1772                    )?;
1773                    AssignmentValue::Expression(translated)
1774                }
1775                Err(err) => return Err(err),
1776            };
1777            column_assignments.push(ColumnAssignment {
1778                column: column_name,
1779                value,
1780            });
1781        }
1782
1783        let filter = match selection {
1784            Some(expr) => Some(translate_condition_with_context(
1785                &resolver,
1786                IdentifierContext::new(table_id),
1787                &expr,
1788            )?),
1789            None => None,
1790        };
1791
1792        let plan = UpdatePlan {
1793            table: display_name.clone(),
1794            assignments: column_assignments,
1795            filter,
1796        };
1797        self.execute_plan_statement(PlanStatement::Update(plan))
1798    }
1799
1800    #[allow(clippy::collapsible_if)]
1801    fn handle_delete(&self, delete: Delete) -> SqlResult<RuntimeStatementResult<P>> {
1802        let Delete {
1803            tables,
1804            from,
1805            using,
1806            selection,
1807            returning,
1808            order_by,
1809            limit,
1810        } = delete;
1811
1812        if !tables.is_empty() {
1813            return Err(Error::InvalidArgumentError(
1814                "multi-table DELETE is not supported yet".into(),
1815            ));
1816        }
1817        if let Some(using_tables) = using {
1818            if !using_tables.is_empty() {
1819                return Err(Error::InvalidArgumentError(
1820                    "DELETE ... USING is not supported yet".into(),
1821                ));
1822            }
1823        }
1824        if returning.is_some() {
1825            return Err(Error::InvalidArgumentError(
1826                "DELETE ... RETURNING is not supported".into(),
1827            ));
1828        }
1829        if !order_by.is_empty() {
1830            return Err(Error::InvalidArgumentError(
1831                "DELETE ... ORDER BY is not supported yet".into(),
1832            ));
1833        }
1834        if limit.is_some() {
1835            return Err(Error::InvalidArgumentError(
1836                "DELETE ... LIMIT is not supported yet".into(),
1837            ));
1838        }
1839
1840        let from_tables = match from {
1841            FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
1842        };
1843        let (display_name, canonical_name) = extract_single_table(&from_tables)?;
1844
1845        if !self.engine.session().has_active_transaction()
1846            && self
1847                .engine
1848                .context()
1849                .is_table_marked_dropped(&canonical_name)
1850        {
1851            return Err(Error::TransactionContextError(
1852                DROPPED_TABLE_TRANSACTION_ERR.into(),
1853            ));
1854        }
1855
1856        let catalog = self.engine.context().table_catalog();
1857        let resolver = catalog.identifier_resolver();
1858        let table_id = catalog.table_id(&canonical_name);
1859
1860        let filter = selection
1861            .map(|expr| {
1862                translate_condition_with_context(&resolver, IdentifierContext::new(table_id), &expr)
1863            })
1864            .transpose()?;
1865
1866        let plan = DeletePlan {
1867            table: display_name.clone(),
1868            filter,
1869        };
1870        self.execute_plan_statement(PlanStatement::Delete(plan))
1871    }
1872
1873    #[allow(clippy::too_many_arguments)] // TODO: Consider refactor
1874    fn handle_drop(
1875        &self,
1876        object_type: ObjectType,
1877        if_exists: bool,
1878        names: Vec<ObjectName>,
1879        cascade: bool,
1880        restrict: bool,
1881        purge: bool,
1882        temporary: bool,
1883    ) -> SqlResult<RuntimeStatementResult<P>> {
1884        if purge || temporary {
1885            return Err(Error::InvalidArgumentError(
1886                "DROP purge/temporary options are not supported".into(),
1887            ));
1888        }
1889
1890        match object_type {
1891            ObjectType::Table => {
1892                if cascade || restrict {
1893                    return Err(Error::InvalidArgumentError(
1894                        "DROP TABLE CASCADE/RESTRICT is not supported".into(),
1895                    ));
1896                }
1897
1898                let session = self.engine.session();
1899                for name in names {
1900                    let table_name = Self::object_name_to_string(&name)?;
1901                    session
1902                        .drop_table(&table_name, if_exists)
1903                        .map_err(|err| Self::map_table_error(&table_name, err))?;
1904                }
1905
1906                Ok(RuntimeStatementResult::NoOp)
1907            }
1908            ObjectType::Schema => {
1909                if restrict {
1910                    return Err(Error::InvalidArgumentError(
1911                        "DROP SCHEMA RESTRICT is not supported".into(),
1912                    ));
1913                }
1914
1915                let catalog = self.engine.context().table_catalog();
1916
1917                for name in names {
1918                    let (display_name, canonical_name) = canonical_object_name(&name)?;
1919
1920                    if !catalog.schema_exists(&canonical_name) {
1921                        if if_exists {
1922                            continue;
1923                        }
1924                        return Err(Error::CatalogError(format!(
1925                            "Schema '{}' does not exist",
1926                            display_name
1927                        )));
1928                    }
1929
1930                    if cascade {
1931                        // Drop all tables in this schema
1932                        let all_tables = catalog.table_names();
1933                        let schema_prefix = format!("{}.", canonical_name);
1934
1935                        let ctx = self.engine.context();
1936                        for table in all_tables {
1937                            if table.to_ascii_lowercase().starts_with(&schema_prefix) {
1938                                ctx.drop_table_immediate(&table, false)?;
1939                            }
1940                        }
1941                    } else {
1942                        // Check if schema has any tables
1943                        let all_tables = catalog.table_names();
1944                        let schema_prefix = format!("{}.", canonical_name);
1945                        let has_tables = all_tables
1946                            .iter()
1947                            .any(|t| t.to_ascii_lowercase().starts_with(&schema_prefix));
1948
1949                        if has_tables {
1950                            return Err(Error::CatalogError(format!(
1951                                "Schema '{}' is not empty. Use CASCADE to drop schema and all its tables",
1952                                display_name
1953                            )));
1954                        }
1955                    }
1956
1957                    // Drop the schema
1958                    if !catalog.unregister_schema(&canonical_name) && !if_exists {
1959                        return Err(Error::CatalogError(format!(
1960                            "Schema '{}' does not exist",
1961                            display_name
1962                        )));
1963                    }
1964                }
1965
1966                Ok(RuntimeStatementResult::NoOp)
1967            }
1968            _ => Err(Error::InvalidArgumentError(format!(
1969                "DROP {} is not supported",
1970                object_type
1971            ))),
1972        }
1973    }
1974
1975    fn handle_query(&self, query: Query) -> SqlResult<RuntimeStatementResult<P>> {
1976        // Check for pragma_table_info() table function first
1977        if let Some(result) = self.try_handle_pragma_table_info(&query)? {
1978            return Ok(result);
1979        }
1980
1981        let select_plan = self.build_select_plan(query)?;
1982        self.execute_plan_statement(PlanStatement::Select(select_plan))
1983    }
1984
1985    fn build_select_plan(&self, query: Query) -> SqlResult<SelectPlan> {
1986        if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
1987            return Err(Error::TransactionContextError(
1988                "TransactionContext Error: transaction is aborted".into(),
1989            ));
1990        }
1991
1992        validate_simple_query(&query)?;
1993        let catalog = self.engine.context().table_catalog();
1994        let resolver = catalog.identifier_resolver();
1995
1996        let (mut select_plan, select_context) = match query.body.as_ref() {
1997            SetExpr::Select(select) => self.translate_select(select.as_ref(), &resolver)?,
1998            other => {
1999                return Err(Error::InvalidArgumentError(format!(
2000                    "unsupported query expression: {other:?}"
2001                )));
2002            }
2003        };
2004        if let Some(order_by) = &query.order_by {
2005            if !select_plan.aggregates.is_empty() {
2006                return Err(Error::InvalidArgumentError(
2007                    "ORDER BY is not supported for aggregate queries".into(),
2008                ));
2009            }
2010            let order_plan = self.translate_order_by(&resolver, select_context, order_by)?;
2011            select_plan = select_plan.with_order_by(order_plan);
2012        }
2013        Ok(select_plan)
2014    }
2015
2016    fn translate_select(
2017        &self,
2018        select: &Select,
2019        resolver: &IdentifierResolver<'_>,
2020    ) -> SqlResult<(SelectPlan, IdentifierContext)> {
2021        if select.distinct.is_some() {
2022            return Err(Error::InvalidArgumentError(
2023                "SELECT DISTINCT is not supported".into(),
2024            ));
2025        }
2026        if select.top.is_some() {
2027            return Err(Error::InvalidArgumentError(
2028                "SELECT TOP is not supported".into(),
2029            ));
2030        }
2031        if select.exclude.is_some() {
2032            return Err(Error::InvalidArgumentError(
2033                "SELECT EXCLUDE is not supported".into(),
2034            ));
2035        }
2036        if select.into.is_some() {
2037            return Err(Error::InvalidArgumentError(
2038                "SELECT INTO is not supported".into(),
2039            ));
2040        }
2041        if !select.lateral_views.is_empty() {
2042            return Err(Error::InvalidArgumentError(
2043                "LATERAL VIEW is not supported".into(),
2044            ));
2045        }
2046        if select.prewhere.is_some() {
2047            return Err(Error::InvalidArgumentError(
2048                "PREWHERE is not supported".into(),
2049            ));
2050        }
2051        if !group_by_is_empty(&select.group_by) || select.value_table_mode.is_some() {
2052            return Err(Error::InvalidArgumentError(
2053                "GROUP BY and SELECT AS VALUE/STRUCT are not supported".into(),
2054            ));
2055        }
2056        if !select.cluster_by.is_empty()
2057            || !select.distribute_by.is_empty()
2058            || !select.sort_by.is_empty()
2059        {
2060            return Err(Error::InvalidArgumentError(
2061                "CLUSTER/DISTRIBUTE/SORT BY clauses are not supported".into(),
2062            ));
2063        }
2064        if select.having.is_some()
2065            || !select.named_window.is_empty()
2066            || select.qualify.is_some()
2067            || select.connect_by.is_some()
2068        {
2069            return Err(Error::InvalidArgumentError(
2070                "advanced SELECT clauses are not supported".into(),
2071            ));
2072        }
2073
2074        let table_alias = select
2075            .from
2076            .first()
2077            .and_then(|table_with_joins| match &table_with_joins.relation {
2078                TableFactor::Table { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2079                _ => None,
2080            });
2081
2082        if let Some(alias) = table_alias.as_ref() {
2083            validate_projection_alias_qualifiers(&select.projection, alias)?;
2084        }
2085        // Handle different FROM clause scenarios
2086        let catalog = self.engine.context().table_catalog();
2087        let (mut plan, id_context) = if select.from.is_empty() {
2088            // No FROM clause - use empty string for table context (e.g., SELECT 42, SELECT {'a': 1} AS x)
2089            let mut p = SelectPlan::new("");
2090            let projections = self.build_projection_list(
2091                resolver,
2092                IdentifierContext::new(None),
2093                &select.projection,
2094            )?;
2095            p = p.with_projections(projections);
2096            (p, IdentifierContext::new(None))
2097        } else if select.from.len() == 1 {
2098            // Single table query
2099            let (display_name, canonical_name) = extract_single_table(&select.from)?;
2100            let table_id = catalog.table_id(&canonical_name);
2101            let mut p = SelectPlan::new(display_name.clone());
2102            if let Some(aggregates) = self.detect_simple_aggregates(&select.projection)? {
2103                p = p.with_aggregates(aggregates);
2104            } else {
2105                let projections = self.build_projection_list(
2106                    resolver,
2107                    IdentifierContext::new(table_id),
2108                    &select.projection,
2109                )?;
2110                p = p.with_projections(projections);
2111            }
2112            (p, IdentifierContext::new(table_id))
2113        } else {
2114            // Multiple tables - cross product
2115            let tables = extract_tables(&select.from)?;
2116            let mut p = SelectPlan::with_tables(tables);
2117            // For multi-table queries, we'll build projections differently
2118            // For now, just handle simple column references
2119            let projections = self.build_projection_list(
2120                resolver,
2121                IdentifierContext::new(None),
2122                &select.projection,
2123            )?;
2124            p = p.with_projections(projections);
2125            (p, IdentifierContext::new(None))
2126        };
2127
2128        let filter_expr = match &select.selection {
2129            Some(expr) => Some(translate_condition_with_context(
2130                resolver, id_context, expr,
2131            )?),
2132            None => None,
2133        };
2134        plan = plan.with_filter(filter_expr);
2135        Ok((plan, id_context))
2136    }
2137
2138    fn translate_order_by(
2139        &self,
2140        resolver: &IdentifierResolver<'_>,
2141        id_context: IdentifierContext,
2142        order_by: &OrderBy,
2143    ) -> SqlResult<Vec<OrderByPlan>> {
2144        let exprs = match &order_by.kind {
2145            OrderByKind::Expressions(exprs) => exprs,
2146            _ => {
2147                return Err(Error::InvalidArgumentError(
2148                    "unsupported ORDER BY clause".into(),
2149                ));
2150            }
2151        };
2152
2153        let base_nulls_first = self.default_nulls_first.load(AtomicOrdering::Relaxed);
2154
2155        let resolve_simple_column = |expr: &SqlExpr| -> SqlResult<String> {
2156            let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2157            match scalar {
2158                llkv_expr::expr::ScalarExpr::Column(column) => Ok(column),
2159                other => Err(Error::InvalidArgumentError(format!(
2160                    "ORDER BY expression must reference a simple column, found {other:?}"
2161                ))),
2162            }
2163        };
2164
2165        let mut plans = Vec::with_capacity(exprs.len());
2166        for order_expr in exprs {
2167            let ascending = order_expr.options.asc.unwrap_or(true);
2168            let default_nulls_first_for_direction = if ascending {
2169                base_nulls_first
2170            } else {
2171                !base_nulls_first
2172            };
2173            let nulls_first = order_expr
2174                .options
2175                .nulls_first
2176                .unwrap_or(default_nulls_first_for_direction);
2177
2178            if let SqlExpr::Identifier(ident) = &order_expr.expr
2179                && ident.value.eq_ignore_ascii_case("ALL")
2180                && ident.quote_style.is_none()
2181            {
2182                plans.push(OrderByPlan {
2183                    target: OrderTarget::All,
2184                    sort_type: OrderSortType::Native,
2185                    ascending,
2186                    nulls_first,
2187                });
2188                continue;
2189            }
2190
2191            let (target, sort_type) = match &order_expr.expr {
2192                SqlExpr::Identifier(_) | SqlExpr::CompoundIdentifier(_) => (
2193                    OrderTarget::Column(resolve_simple_column(&order_expr.expr)?),
2194                    OrderSortType::Native,
2195                ),
2196                SqlExpr::Cast {
2197                    expr,
2198                    data_type:
2199                        SqlDataType::Int(_)
2200                        | SqlDataType::Integer(_)
2201                        | SqlDataType::BigInt(_)
2202                        | SqlDataType::SmallInt(_)
2203                        | SqlDataType::TinyInt(_),
2204                    ..
2205                } => (
2206                    OrderTarget::Column(resolve_simple_column(expr)?),
2207                    OrderSortType::CastTextToInteger,
2208                ),
2209                SqlExpr::Cast { data_type, .. } => {
2210                    return Err(Error::InvalidArgumentError(format!(
2211                        "ORDER BY CAST target type {:?} is not supported",
2212                        data_type
2213                    )));
2214                }
2215                SqlExpr::Value(value_with_span) => match &value_with_span.value {
2216                    Value::Number(raw, _) => {
2217                        let position: usize = raw.parse().map_err(|_| {
2218                            Error::InvalidArgumentError(format!(
2219                                "ORDER BY position '{}' is not a valid positive integer",
2220                                raw
2221                            ))
2222                        })?;
2223                        if position == 0 {
2224                            return Err(Error::InvalidArgumentError(
2225                                "ORDER BY position must be at least 1".into(),
2226                            ));
2227                        }
2228                        (OrderTarget::Index(position - 1), OrderSortType::Native)
2229                    }
2230                    other => {
2231                        return Err(Error::InvalidArgumentError(format!(
2232                            "unsupported ORDER BY literal expression: {other:?}"
2233                        )));
2234                    }
2235                },
2236                other => {
2237                    return Err(Error::InvalidArgumentError(format!(
2238                        "unsupported ORDER BY expression: {other:?}"
2239                    )));
2240                }
2241            };
2242
2243            plans.push(OrderByPlan {
2244                target,
2245                sort_type,
2246                ascending,
2247                nulls_first,
2248            });
2249        }
2250
2251        Ok(plans)
2252    }
2253
2254    fn detect_simple_aggregates(
2255        &self,
2256        projection_items: &[SelectItem],
2257    ) -> SqlResult<Option<Vec<AggregateExpr>>> {
2258        if projection_items.is_empty() {
2259            return Ok(None);
2260        }
2261
2262        let mut specs: Vec<AggregateExpr> = Vec::with_capacity(projection_items.len());
2263        for (idx, item) in projection_items.iter().enumerate() {
2264            let (expr, alias_opt) = match item {
2265                SelectItem::UnnamedExpr(expr) => (expr, None),
2266                SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())),
2267                _ => return Ok(None),
2268            };
2269
2270            let alias = alias_opt.unwrap_or_else(|| format!("col{}", idx + 1));
2271            let SqlExpr::Function(func) = expr else {
2272                return Ok(None);
2273            };
2274
2275            if func.uses_odbc_syntax {
2276                return Err(Error::InvalidArgumentError(
2277                    "ODBC function syntax is not supported in aggregate queries".into(),
2278                ));
2279            }
2280            if !matches!(func.parameters, FunctionArguments::None) {
2281                return Err(Error::InvalidArgumentError(
2282                    "parameterized aggregate functions are not supported".into(),
2283                ));
2284            }
2285            if func.filter.is_some()
2286                || func.null_treatment.is_some()
2287                || func.over.is_some()
2288                || !func.within_group.is_empty()
2289            {
2290                return Err(Error::InvalidArgumentError(
2291                    "advanced aggregate clauses are not supported".into(),
2292                ));
2293            }
2294
2295            let mut is_distinct = false;
2296            let args_slice: &[FunctionArg] = match &func.args {
2297                FunctionArguments::List(list) => {
2298                    if let Some(dup) = &list.duplicate_treatment {
2299                        use sqlparser::ast::DuplicateTreatment;
2300                        match dup {
2301                            DuplicateTreatment::All => {}
2302                            DuplicateTreatment::Distinct => is_distinct = true,
2303                        }
2304                    }
2305                    if !list.clauses.is_empty() {
2306                        return Err(Error::InvalidArgumentError(
2307                            "aggregate argument clauses are not supported".into(),
2308                        ));
2309                    }
2310                    &list.args
2311                }
2312                FunctionArguments::None => &[],
2313                FunctionArguments::Subquery(_) => {
2314                    return Err(Error::InvalidArgumentError(
2315                        "aggregate subquery arguments are not supported".into(),
2316                    ));
2317                }
2318            };
2319
2320            let func_name = if func.name.0.len() == 1 {
2321                match &func.name.0[0] {
2322                    ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
2323                    _ => {
2324                        return Err(Error::InvalidArgumentError(
2325                            "unsupported aggregate function name".into(),
2326                        ));
2327                    }
2328                }
2329            } else {
2330                return Err(Error::InvalidArgumentError(
2331                    "qualified aggregate function names are not supported".into(),
2332                ));
2333            };
2334
2335            let aggregate = match func_name.as_str() {
2336                "count" => {
2337                    if args_slice.len() != 1 {
2338                        return Err(Error::InvalidArgumentError(
2339                            "COUNT accepts exactly one argument".into(),
2340                        ));
2341                    }
2342                    match &args_slice[0] {
2343                        FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
2344                            if is_distinct {
2345                                return Err(Error::InvalidArgumentError(
2346                                    "COUNT(DISTINCT *) is not supported".into(),
2347                                ));
2348                            }
2349                            AggregateExpr::count_star(alias)
2350                        }
2351                        FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
2352                            let column = resolve_column_name(arg_expr)?;
2353                            if is_distinct {
2354                                AggregateExpr::count_distinct_column(column, alias)
2355                            } else {
2356                                AggregateExpr::count_column(column, alias)
2357                            }
2358                        }
2359                        FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
2360                            return Err(Error::InvalidArgumentError(
2361                                "named COUNT arguments are not supported".into(),
2362                            ));
2363                        }
2364                        FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
2365                            return Err(Error::InvalidArgumentError(
2366                                "COUNT does not support qualified wildcards".into(),
2367                            ));
2368                        }
2369                    }
2370                }
2371                "sum" | "min" | "max" => {
2372                    if is_distinct {
2373                        return Err(Error::InvalidArgumentError(
2374                            "DISTINCT is not supported for this aggregate".into(),
2375                        ));
2376                    }
2377                    if args_slice.len() != 1 {
2378                        return Err(Error::InvalidArgumentError(format!(
2379                            "{} accepts exactly one argument",
2380                            func_name.to_uppercase()
2381                        )));
2382                    }
2383                    let arg_expr = match &args_slice[0] {
2384                        FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => arg_expr,
2385                        FunctionArg::Unnamed(FunctionArgExpr::Wildcard)
2386                        | FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
2387                            return Err(Error::InvalidArgumentError(format!(
2388                                "{} does not support wildcard arguments",
2389                                func_name.to_uppercase()
2390                            )));
2391                        }
2392                        FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
2393                            return Err(Error::InvalidArgumentError(format!(
2394                                "{} arguments must be column references",
2395                                func_name.to_uppercase()
2396                            )));
2397                        }
2398                    };
2399
2400                    if func_name == "sum" {
2401                        if let Some(column) = parse_count_nulls_case(arg_expr)? {
2402                            AggregateExpr::count_nulls(column, alias)
2403                        } else {
2404                            let column = resolve_column_name(arg_expr)?;
2405                            AggregateExpr::sum_int64(column, alias)
2406                        }
2407                    } else {
2408                        let column = resolve_column_name(arg_expr)?;
2409                        if func_name == "min" {
2410                            AggregateExpr::min_int64(column, alias)
2411                        } else {
2412                            AggregateExpr::max_int64(column, alias)
2413                        }
2414                    }
2415                }
2416                _ => return Ok(None),
2417            };
2418
2419            specs.push(aggregate);
2420        }
2421
2422        if specs.is_empty() {
2423            return Ok(None);
2424        }
2425        Ok(Some(specs))
2426    }
2427
2428    fn build_projection_list(
2429        &self,
2430        resolver: &IdentifierResolver<'_>,
2431        id_context: IdentifierContext,
2432        projection_items: &[SelectItem],
2433    ) -> SqlResult<Vec<SelectProjection>> {
2434        if projection_items.is_empty() {
2435            return Err(Error::InvalidArgumentError(
2436                "SELECT projection must include at least one column".into(),
2437            ));
2438        }
2439
2440        let mut projections = Vec::with_capacity(projection_items.len());
2441        for (idx, item) in projection_items.iter().enumerate() {
2442            match item {
2443                SelectItem::Wildcard(options) => {
2444                    if let Some(exclude) = &options.opt_exclude {
2445                        use sqlparser::ast::ExcludeSelectItem;
2446                        let exclude_cols = match exclude {
2447                            ExcludeSelectItem::Single(ident) => vec![ident.value.clone()],
2448                            ExcludeSelectItem::Multiple(idents) => {
2449                                idents.iter().map(|id| id.value.clone()).collect()
2450                            }
2451                        };
2452                        projections.push(SelectProjection::AllColumnsExcept {
2453                            exclude: exclude_cols,
2454                        });
2455                    } else {
2456                        projections.push(SelectProjection::AllColumns);
2457                    }
2458                }
2459                SelectItem::QualifiedWildcard(kind, _) => match kind {
2460                    SelectItemQualifiedWildcardKind::ObjectName(name) => {
2461                        projections.push(SelectProjection::Column {
2462                            name: name.to_string(),
2463                            alias: None,
2464                        });
2465                    }
2466                    SelectItemQualifiedWildcardKind::Expr(_) => {
2467                        return Err(Error::InvalidArgumentError(
2468                            "expression-qualified wildcards are not supported".into(),
2469                        ));
2470                    }
2471                },
2472                SelectItem::UnnamedExpr(expr) => match expr {
2473                    SqlExpr::Identifier(ident) => {
2474                        let parts = vec![ident.value.clone()];
2475                        let resolution = resolver.resolve(&parts, id_context)?;
2476                        if resolution.is_simple() {
2477                            projections.push(SelectProjection::Column {
2478                                name: resolution.column().to_string(),
2479                                alias: None,
2480                            });
2481                        } else {
2482                            let alias = format!("col{}", idx + 1);
2483                            projections.push(SelectProjection::Computed {
2484                                expr: resolution.into_scalar_expr(),
2485                                alias,
2486                            });
2487                        }
2488                    }
2489                    SqlExpr::CompoundIdentifier(parts) => {
2490                        let name_parts: Vec<String> =
2491                            parts.iter().map(|part| part.value.clone()).collect();
2492                        let resolution = resolver.resolve(&name_parts, id_context)?;
2493                        if resolution.is_simple() {
2494                            projections.push(SelectProjection::Column {
2495                                name: resolution.column().to_string(),
2496                                alias: None,
2497                            });
2498                        } else {
2499                            let alias = format!("col{}", idx + 1);
2500                            projections.push(SelectProjection::Computed {
2501                                expr: resolution.into_scalar_expr(),
2502                                alias,
2503                            });
2504                        }
2505                    }
2506                    _ => {
2507                        let alias = format!("col{}", idx + 1);
2508                        let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2509                        projections.push(SelectProjection::Computed {
2510                            expr: scalar,
2511                            alias,
2512                        });
2513                    }
2514                },
2515                SelectItem::ExprWithAlias { expr, alias } => match expr {
2516                    SqlExpr::Identifier(ident) => {
2517                        let parts = vec![ident.value.clone()];
2518                        let resolution = resolver.resolve(&parts, id_context)?;
2519                        if resolution.is_simple() {
2520                            projections.push(SelectProjection::Column {
2521                                name: resolution.column().to_string(),
2522                                alias: Some(alias.value.clone()),
2523                            });
2524                        } else {
2525                            projections.push(SelectProjection::Computed {
2526                                expr: resolution.into_scalar_expr(),
2527                                alias: alias.value.clone(),
2528                            });
2529                        }
2530                    }
2531                    SqlExpr::CompoundIdentifier(parts) => {
2532                        let name_parts: Vec<String> =
2533                            parts.iter().map(|part| part.value.clone()).collect();
2534                        let resolution = resolver.resolve(&name_parts, id_context)?;
2535                        if resolution.is_simple() {
2536                            projections.push(SelectProjection::Column {
2537                                name: resolution.column().to_string(),
2538                                alias: Some(alias.value.clone()),
2539                            });
2540                        } else {
2541                            projections.push(SelectProjection::Computed {
2542                                expr: resolution.into_scalar_expr(),
2543                                alias: alias.value.clone(),
2544                            });
2545                        }
2546                    }
2547                    _ => {
2548                        let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2549                        projections.push(SelectProjection::Computed {
2550                            expr: scalar,
2551                            alias: alias.value.clone(),
2552                        });
2553                    }
2554                },
2555            }
2556        }
2557        Ok(projections)
2558    }
2559
2560    #[allow(clippy::too_many_arguments)] // TODO: Refactor using struct for arg
2561    fn handle_start_transaction(
2562        &self,
2563        modes: Vec<TransactionMode>,
2564        begin: bool,
2565        transaction: Option<BeginTransactionKind>,
2566        modifier: Option<TransactionModifier>,
2567        statements: Vec<Statement>,
2568        exception: Option<Vec<ExceptionWhen>>,
2569        has_end_keyword: bool,
2570    ) -> SqlResult<RuntimeStatementResult<P>> {
2571        if !modes.is_empty() {
2572            return Err(Error::InvalidArgumentError(
2573                "transaction modes are not supported".into(),
2574            ));
2575        }
2576        if modifier.is_some() {
2577            return Err(Error::InvalidArgumentError(
2578                "transaction modifiers are not supported".into(),
2579            ));
2580        }
2581        if !statements.is_empty() || exception.is_some() || has_end_keyword {
2582            return Err(Error::InvalidArgumentError(
2583                "BEGIN blocks with inline statements or exceptions are not supported".into(),
2584            ));
2585        }
2586        if let Some(kind) = transaction {
2587            match kind {
2588                BeginTransactionKind::Transaction | BeginTransactionKind::Work => {}
2589            }
2590        }
2591        if !begin {
2592            // Currently treat START TRANSACTION same as BEGIN
2593            tracing::warn!("Currently treat `START TRANSACTION` same as `BEGIN`")
2594        }
2595
2596        self.execute_plan_statement(PlanStatement::BeginTransaction)
2597    }
2598
2599    fn handle_commit(
2600        &self,
2601        chain: bool,
2602        end: bool,
2603        modifier: Option<TransactionModifier>,
2604    ) -> SqlResult<RuntimeStatementResult<P>> {
2605        if chain {
2606            return Err(Error::InvalidArgumentError(
2607                "COMMIT AND [NO] CHAIN is not supported".into(),
2608            ));
2609        }
2610        if end {
2611            return Err(Error::InvalidArgumentError(
2612                "END blocks are not supported".into(),
2613            ));
2614        }
2615        if modifier.is_some() {
2616            return Err(Error::InvalidArgumentError(
2617                "transaction modifiers are not supported".into(),
2618            ));
2619        }
2620
2621        self.execute_plan_statement(PlanStatement::CommitTransaction)
2622    }
2623
2624    fn handle_rollback(
2625        &self,
2626        chain: bool,
2627        savepoint: Option<Ident>,
2628    ) -> SqlResult<RuntimeStatementResult<P>> {
2629        if chain {
2630            return Err(Error::InvalidArgumentError(
2631                "ROLLBACK AND [NO] CHAIN is not supported".into(),
2632            ));
2633        }
2634        if savepoint.is_some() {
2635            return Err(Error::InvalidArgumentError(
2636                "ROLLBACK TO SAVEPOINT is not supported".into(),
2637            ));
2638        }
2639
2640        self.execute_plan_statement(PlanStatement::RollbackTransaction)
2641    }
2642
2643    fn handle_set(&self, set_stmt: Set) -> SqlResult<RuntimeStatementResult<P>> {
2644        match set_stmt {
2645            Set::SingleAssignment {
2646                scope,
2647                hivevar,
2648                variable,
2649                values,
2650            } => {
2651                if scope.is_some() || hivevar {
2652                    return Err(Error::InvalidArgumentError(
2653                        "SET modifiers are not supported".into(),
2654                    ));
2655                }
2656
2657                let variable_name_raw = variable.to_string();
2658                let variable_name = variable_name_raw.to_ascii_lowercase();
2659
2660                match variable_name.as_str() {
2661                    "default_null_order" => {
2662                        if values.len() != 1 {
2663                            return Err(Error::InvalidArgumentError(
2664                                "SET default_null_order expects exactly one value".into(),
2665                            ));
2666                        }
2667
2668                        let value_expr = &values[0];
2669                        let normalized = match value_expr {
2670                            SqlExpr::Value(value_with_span) => value_with_span
2671                                .value
2672                                .clone()
2673                                .into_string()
2674                                .map(|s| s.to_ascii_lowercase()),
2675                            SqlExpr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
2676                            _ => None,
2677                        };
2678
2679                        if !matches!(normalized.as_deref(), Some("nulls_first" | "nulls_last")) {
2680                            return Err(Error::InvalidArgumentError(format!(
2681                                "unsupported value for SET default_null_order: {value_expr:?}"
2682                            )));
2683                        }
2684
2685                        let use_nulls_first = matches!(normalized.as_deref(), Some("nulls_first"));
2686                        self.default_nulls_first
2687                            .store(use_nulls_first, AtomicOrdering::Relaxed);
2688
2689                        Ok(RuntimeStatementResult::NoOp)
2690                    }
2691                    "immediate_transaction_mode" => {
2692                        if values.len() != 1 {
2693                            return Err(Error::InvalidArgumentError(
2694                                "SET immediate_transaction_mode expects exactly one value".into(),
2695                            ));
2696                        }
2697                        let normalized = values[0].to_string().to_ascii_lowercase();
2698                        let enabled = match normalized.as_str() {
2699                            "true" | "on" | "1" => true,
2700                            "false" | "off" | "0" => false,
2701                            _ => {
2702                                return Err(Error::InvalidArgumentError(format!(
2703                                    "unsupported value for SET immediate_transaction_mode: {}",
2704                                    values[0]
2705                                )));
2706                            }
2707                        };
2708                        if !enabled {
2709                            tracing::warn!(
2710                                "SET immediate_transaction_mode=false has no effect; continuing with auto mode"
2711                            );
2712                        }
2713                        Ok(RuntimeStatementResult::NoOp)
2714                    }
2715                    _ => Err(Error::InvalidArgumentError(format!(
2716                        "unsupported SET variable: {variable_name_raw}"
2717                    ))),
2718                }
2719            }
2720            other => Err(Error::InvalidArgumentError(format!(
2721                "unsupported SQL SET statement: {other:?}",
2722            ))),
2723        }
2724    }
2725
2726    fn handle_pragma(
2727        &self,
2728        name: ObjectName,
2729        value: Option<Value>,
2730        is_eq: bool,
2731    ) -> SqlResult<RuntimeStatementResult<P>> {
2732        let (display, canonical) = canonical_object_name(&name)?;
2733        if value.is_some() || is_eq {
2734            return Err(Error::InvalidArgumentError(format!(
2735                "PRAGMA '{display}' does not accept a value"
2736            )));
2737        }
2738
2739        match canonical.as_str() {
2740            "enable_verification" | "disable_verification" => Ok(RuntimeStatementResult::NoOp),
2741            _ => Err(Error::InvalidArgumentError(format!(
2742                "unsupported PRAGMA '{}'",
2743                display
2744            ))),
2745        }
2746    }
2747}
2748
2749fn canonical_object_name(name: &ObjectName) -> SqlResult<(String, String)> {
2750    if name.0.is_empty() {
2751        return Err(Error::InvalidArgumentError(
2752            "object name must not be empty".into(),
2753        ));
2754    }
2755    let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
2756    for part in &name.0 {
2757        let ident = match part {
2758            ObjectNamePart::Identifier(ident) => ident,
2759            _ => {
2760                return Err(Error::InvalidArgumentError(
2761                    "object names using functions are not supported".into(),
2762                ));
2763            }
2764        };
2765        parts.push(ident.value.clone());
2766    }
2767    let display = parts.join(".");
2768    let canonical = display.to_ascii_lowercase();
2769    Ok((display, canonical))
2770}
2771
2772/// Parse an object name into optional schema and table name components.
2773///
2774/// Returns (schema_name, table_name) where schema_name is None if not qualified.
2775///
2776/// Examples:
2777/// - "users" -> (None, "users")
2778/// - "test.users" -> (Some("test"), "users")
2779/// - "catalog.test.users" -> Error (too many parts)
2780fn parse_schema_qualified_name(name: &ObjectName) -> SqlResult<(Option<String>, String)> {
2781    if name.0.is_empty() {
2782        return Err(Error::InvalidArgumentError(
2783            "object name must not be empty".into(),
2784        ));
2785    }
2786
2787    let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
2788    for part in &name.0 {
2789        let ident = match part {
2790            ObjectNamePart::Identifier(ident) => ident,
2791            _ => {
2792                return Err(Error::InvalidArgumentError(
2793                    "object names using functions are not supported".into(),
2794                ));
2795            }
2796        };
2797        parts.push(ident.value.clone());
2798    }
2799
2800    match parts.len() {
2801        1 => Ok((None, parts[0].clone())),
2802        2 => Ok((Some(parts[0].clone()), parts[1].clone())),
2803        _ => Err(Error::InvalidArgumentError(format!(
2804            "table name has too many parts: {}",
2805            name
2806        ))),
2807    }
2808}
2809
2810/// Extract column name from an index column specification (OrderBy expression).
2811///
2812/// This handles the common pattern of validating and extracting column names from
2813/// SQL index column definitions (PRIMARY KEY, UNIQUE, CREATE INDEX).
2814///
2815/// # Parameters
2816/// - `index_col`: The index column from sqlparser AST
2817/// - `context`: Description of where this column appears (e.g., "PRIMARY KEY", "UNIQUE constraint")
2818/// - `allow_sort_options`: If false, errors if any sort options are present; if true, validates them
2819/// - `allow_compound`: If true, allows compound identifiers and takes the last part; if false, only allows simple identifiers
2820///
2821/// # Returns
2822/// Column name as a String
2823fn extract_index_column_name(
2824    index_col: &sqlparser::ast::IndexColumn,
2825    context: &str,
2826    allow_sort_options: bool,
2827    allow_compound: bool,
2828) -> SqlResult<String> {
2829    use sqlparser::ast::Expr as SqlExpr;
2830
2831    // Check operator class
2832    if index_col.operator_class.is_some() {
2833        return Err(Error::InvalidArgumentError(format!(
2834            "{} operator classes are not supported",
2835            context
2836        )));
2837    }
2838
2839    let order_expr = &index_col.column;
2840
2841    // Validate sort options
2842    if allow_sort_options {
2843        // For CREATE INDEX: validate specific values are supported
2844        let ascending = order_expr.options.asc.unwrap_or(true);
2845        let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
2846
2847        if !ascending {
2848            return Err(Error::InvalidArgumentError(format!(
2849                "{} DESC ordering is not supported",
2850                context
2851            )));
2852        }
2853        if nulls_first {
2854            return Err(Error::InvalidArgumentError(format!(
2855                "{} NULLS FIRST ordering is not supported",
2856                context
2857            )));
2858        }
2859    } else {
2860        // For constraints: no sort options allowed
2861        if order_expr.options.asc.is_some()
2862            || order_expr.options.nulls_first.is_some()
2863            || order_expr.with_fill.is_some()
2864        {
2865            return Err(Error::InvalidArgumentError(format!(
2866                "{} columns must be simple identifiers",
2867                context
2868            )));
2869        }
2870    }
2871
2872    // Extract column name from expression
2873    let column_name = match &order_expr.expr {
2874        SqlExpr::Identifier(ident) => ident.value.clone(),
2875        SqlExpr::CompoundIdentifier(parts) => {
2876            if allow_compound {
2877                // For CREATE INDEX: allow qualified names, take last part
2878                parts
2879                    .last()
2880                    .map(|ident| ident.value.clone())
2881                    .ok_or_else(|| {
2882                        Error::InvalidArgumentError(format!(
2883                            "invalid column reference in {}",
2884                            context
2885                        ))
2886                    })?
2887            } else if parts.len() == 1 {
2888                // For constraints: only allow single-part compound identifiers
2889                parts[0].value.clone()
2890            } else {
2891                return Err(Error::InvalidArgumentError(format!(
2892                    "{} columns must be column identifiers",
2893                    context
2894                )));
2895            }
2896        }
2897        other => {
2898            return Err(Error::InvalidArgumentError(format!(
2899                "{} only supports column references, found {:?}",
2900                context, other
2901            )));
2902        }
2903    };
2904
2905    Ok(column_name)
2906}
2907
2908fn validate_create_table_common(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
2909    if stmt.clone.is_some() || stmt.like.is_some() {
2910        return Err(Error::InvalidArgumentError(
2911            "CREATE TABLE LIKE/CLONE is not supported".into(),
2912        ));
2913    }
2914    if stmt.or_replace && stmt.if_not_exists {
2915        return Err(Error::InvalidArgumentError(
2916            "CREATE TABLE cannot combine OR REPLACE with IF NOT EXISTS".into(),
2917        ));
2918    }
2919    use sqlparser::ast::TableConstraint;
2920
2921    let mut seen_primary_key = false;
2922    for constraint in &stmt.constraints {
2923        match constraint {
2924            TableConstraint::PrimaryKey { .. } => {
2925                if seen_primary_key {
2926                    return Err(Error::InvalidArgumentError(
2927                        "multiple PRIMARY KEY constraints are not supported".into(),
2928                    ));
2929                }
2930                seen_primary_key = true;
2931            }
2932            TableConstraint::Unique { .. } => {
2933                // Detailed validation is performed later during plan construction.
2934            }
2935            TableConstraint::ForeignKey { .. } => {
2936                // Detailed validation is performed later during plan construction.
2937            }
2938            other => {
2939                return Err(Error::InvalidArgumentError(format!(
2940                    "table-level constraint {:?} is not supported",
2941                    other
2942                )));
2943            }
2944        }
2945    }
2946    Ok(())
2947}
2948
2949fn validate_check_constraint(
2950    check_expr: &sqlparser::ast::Expr,
2951    table_name: &str,
2952    column_names: &[&str],
2953) -> SqlResult<()> {
2954    use sqlparser::ast::Expr as SqlExpr;
2955
2956    let column_names_lower: HashSet<String> = column_names
2957        .iter()
2958        .map(|name| name.to_ascii_lowercase())
2959        .collect();
2960
2961    let mut stack: Vec<&SqlExpr> = vec![check_expr];
2962
2963    while let Some(expr) = stack.pop() {
2964        match expr {
2965            SqlExpr::Subquery(_) => {
2966                return Err(Error::InvalidArgumentError(
2967                    "Subqueries are not allowed in CHECK constraints".into(),
2968                ));
2969            }
2970            SqlExpr::Function(func) => {
2971                let func_name = func.name.to_string().to_uppercase();
2972                if matches!(func_name.as_str(), "SUM" | "AVG" | "COUNT" | "MIN" | "MAX") {
2973                    return Err(Error::InvalidArgumentError(
2974                        "Aggregate functions are not allowed in CHECK constraints".into(),
2975                    ));
2976                }
2977
2978                if let sqlparser::ast::FunctionArguments::List(list) = &func.args {
2979                    for arg in &list.args {
2980                        if let sqlparser::ast::FunctionArg::Unnamed(
2981                            sqlparser::ast::FunctionArgExpr::Expr(expr),
2982                        ) = arg
2983                        {
2984                            stack.push(expr);
2985                        }
2986                    }
2987                }
2988            }
2989            SqlExpr::Identifier(ident) => {
2990                if !column_names_lower.contains(&ident.value.to_ascii_lowercase()) {
2991                    return Err(Error::InvalidArgumentError(format!(
2992                        "Column '{}' referenced in CHECK constraint does not exist",
2993                        ident.value
2994                    )));
2995                }
2996            }
2997            SqlExpr::CompoundIdentifier(idents) => {
2998                if idents.len() == 2 {
2999                    let first = idents[0].value.as_str();
3000                    let second = &idents[1].value;
3001
3002                    if column_names_lower.contains(&first.to_ascii_lowercase()) {
3003                        continue;
3004                    }
3005
3006                    if !first.eq_ignore_ascii_case(table_name) {
3007                        return Err(Error::InvalidArgumentError(format!(
3008                            "CHECK constraint references column from different table '{}'",
3009                            first
3010                        )));
3011                    }
3012
3013                    if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3014                        return Err(Error::InvalidArgumentError(format!(
3015                            "Column '{}' referenced in CHECK constraint does not exist",
3016                            second
3017                        )));
3018                    }
3019                } else if idents.len() == 3 {
3020                    let first = &idents[0].value;
3021                    let second = &idents[1].value;
3022                    let third = &idents[2].value;
3023
3024                    if first.eq_ignore_ascii_case(table_name) {
3025                        if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3026                            return Err(Error::InvalidArgumentError(format!(
3027                                "Column '{}' referenced in CHECK constraint does not exist",
3028                                second
3029                            )));
3030                        }
3031                    } else if second.eq_ignore_ascii_case(table_name) {
3032                        if !column_names_lower.contains(&third.to_ascii_lowercase()) {
3033                            return Err(Error::InvalidArgumentError(format!(
3034                                "Column '{}' referenced in CHECK constraint does not exist",
3035                                third
3036                            )));
3037                        }
3038                    } else {
3039                        return Err(Error::InvalidArgumentError(format!(
3040                            "CHECK constraint references column from different table '{}'",
3041                            second
3042                        )));
3043                    }
3044                }
3045            }
3046            SqlExpr::BinaryOp { left, right, .. } => {
3047                stack.push(left);
3048                stack.push(right);
3049            }
3050            SqlExpr::UnaryOp { expr, .. } | SqlExpr::Nested(expr) => {
3051                stack.push(expr);
3052            }
3053            SqlExpr::Value(_) | SqlExpr::TypedString { .. } => {}
3054            _ => {}
3055        }
3056    }
3057
3058    Ok(())
3059}
3060
3061fn validate_create_table_definition(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3062    for column in &stmt.columns {
3063        for ColumnOptionDef { option, .. } in &column.options {
3064            match option {
3065                ColumnOption::Null
3066                | ColumnOption::NotNull
3067                | ColumnOption::Unique { .. }
3068                | ColumnOption::Check(_)
3069                | ColumnOption::ForeignKey { .. } => {}
3070                ColumnOption::Default(_) => {
3071                    return Err(Error::InvalidArgumentError(format!(
3072                        "DEFAULT values are not supported for column '{}'",
3073                        column.name
3074                    )));
3075                }
3076                other => {
3077                    return Err(Error::InvalidArgumentError(format!(
3078                        "unsupported column option {:?} on '{}'",
3079                        other, column.name
3080                    )));
3081                }
3082            }
3083        }
3084    }
3085    Ok(())
3086}
3087
3088fn validate_create_table_as(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3089    if !stmt.columns.is_empty() {
3090        return Err(Error::InvalidArgumentError(
3091            "CREATE TABLE AS SELECT does not support column definitions yet".into(),
3092        ));
3093    }
3094    Ok(())
3095}
3096
3097fn validate_simple_query(query: &Query) -> SqlResult<()> {
3098    if query.with.is_some() {
3099        return Err(Error::InvalidArgumentError(
3100            "WITH clauses are not supported".into(),
3101        ));
3102    }
3103    if let Some(limit_clause) = &query.limit_clause {
3104        match limit_clause {
3105            LimitClause::LimitOffset {
3106                offset: Some(_), ..
3107            }
3108            | LimitClause::OffsetCommaLimit { .. } => {
3109                return Err(Error::InvalidArgumentError(
3110                    "OFFSET clauses are not supported".into(),
3111                ));
3112            }
3113            LimitClause::LimitOffset { limit_by, .. } if !limit_by.is_empty() => {
3114                return Err(Error::InvalidArgumentError(
3115                    "LIMIT BY clauses are not supported".into(),
3116                ));
3117            }
3118            _ => {}
3119        }
3120    }
3121    if query.fetch.is_some() {
3122        return Err(Error::InvalidArgumentError(
3123            "FETCH clauses are not supported".into(),
3124        ));
3125    }
3126    Ok(())
3127}
3128
3129fn resolve_column_name(expr: &SqlExpr) -> SqlResult<String> {
3130    match expr {
3131        SqlExpr::Identifier(ident) => Ok(ident.value.clone()),
3132        SqlExpr::CompoundIdentifier(parts) => {
3133            if let Some(last) = parts.last() {
3134                Ok(last.value.clone())
3135            } else {
3136                Err(Error::InvalidArgumentError(
3137                    "empty column identifier".into(),
3138                ))
3139            }
3140        }
3141        _ => Err(Error::InvalidArgumentError(
3142            "aggregate arguments must be plain column identifiers".into(),
3143        )),
3144    }
3145}
3146
3147fn validate_projection_alias_qualifiers(
3148    projection_items: &[SelectItem],
3149    alias: &str,
3150) -> SqlResult<()> {
3151    let alias_lower = alias.to_ascii_lowercase();
3152    for item in projection_items {
3153        match item {
3154            SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
3155                if let SqlExpr::CompoundIdentifier(parts) = expr
3156                    && parts.len() >= 2
3157                    && let Some(first) = parts.first()
3158                    && !first.value.eq_ignore_ascii_case(&alias_lower)
3159                {
3160                    return Err(Error::InvalidArgumentError(format!(
3161                        "Binder Error: table '{}' not found",
3162                        first.value
3163                    )));
3164                }
3165            }
3166            _ => {}
3167        }
3168    }
3169    Ok(())
3170}
3171
3172/// Try to parse a function as an aggregate call for use in scalar expressions
3173/// Check if a scalar expression contains any aggregate functions
3174#[allow(dead_code)] // Utility function for future use
3175fn expr_contains_aggregate(expr: &llkv_expr::expr::ScalarExpr<String>) -> bool {
3176    match expr {
3177        llkv_expr::expr::ScalarExpr::Aggregate(_) => true,
3178        llkv_expr::expr::ScalarExpr::Binary { left, right, .. } => {
3179            expr_contains_aggregate(left) || expr_contains_aggregate(right)
3180        }
3181        llkv_expr::expr::ScalarExpr::GetField { base, .. } => expr_contains_aggregate(base),
3182        llkv_expr::expr::ScalarExpr::Column(_) | llkv_expr::expr::ScalarExpr::Literal(_) => false,
3183    }
3184}
3185
3186fn try_parse_aggregate_function(
3187    func: &sqlparser::ast::Function,
3188) -> SqlResult<Option<llkv_expr::expr::AggregateCall<String>>> {
3189    use sqlparser::ast::{FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart};
3190
3191    if func.uses_odbc_syntax {
3192        return Ok(None);
3193    }
3194    if !matches!(func.parameters, FunctionArguments::None) {
3195        return Ok(None);
3196    }
3197    if func.filter.is_some()
3198        || func.null_treatment.is_some()
3199        || func.over.is_some()
3200        || !func.within_group.is_empty()
3201    {
3202        return Ok(None);
3203    }
3204
3205    let func_name = if func.name.0.len() == 1 {
3206        match &func.name.0[0] {
3207            ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
3208            _ => return Ok(None),
3209        }
3210    } else {
3211        return Ok(None);
3212    };
3213
3214    let args_slice: &[FunctionArg] = match &func.args {
3215        FunctionArguments::List(list) => {
3216            if list.duplicate_treatment.is_some() || !list.clauses.is_empty() {
3217                return Ok(None);
3218            }
3219            &list.args
3220        }
3221        FunctionArguments::None => &[],
3222        FunctionArguments::Subquery(_) => return Ok(None),
3223    };
3224
3225    let agg_call = match func_name.as_str() {
3226        "count" => {
3227            if args_slice.len() != 1 {
3228                return Err(Error::InvalidArgumentError(
3229                    "COUNT accepts exactly one argument".into(),
3230                ));
3231            }
3232            match &args_slice[0] {
3233                FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
3234                    llkv_expr::expr::AggregateCall::CountStar
3235                }
3236                FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
3237                    let column = resolve_column_name(arg_expr)?;
3238                    llkv_expr::expr::AggregateCall::Count(column)
3239                }
3240                _ => {
3241                    return Err(Error::InvalidArgumentError(
3242                        "unsupported COUNT argument".into(),
3243                    ));
3244                }
3245            }
3246        }
3247        "sum" => {
3248            if args_slice.len() != 1 {
3249                return Err(Error::InvalidArgumentError(
3250                    "SUM accepts exactly one argument".into(),
3251                ));
3252            }
3253            let arg_expr = match &args_slice[0] {
3254                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3255                _ => {
3256                    return Err(Error::InvalidArgumentError(
3257                        "SUM requires a column argument".into(),
3258                    ));
3259                }
3260            };
3261
3262            // Check for COUNT(CASE ...) pattern
3263            if let Some(column) = parse_count_nulls_case(arg_expr)? {
3264                llkv_expr::expr::AggregateCall::CountNulls(column)
3265            } else {
3266                let column = resolve_column_name(arg_expr)?;
3267                llkv_expr::expr::AggregateCall::Sum(column)
3268            }
3269        }
3270        "min" => {
3271            if args_slice.len() != 1 {
3272                return Err(Error::InvalidArgumentError(
3273                    "MIN accepts exactly one argument".into(),
3274                ));
3275            }
3276            let arg_expr = match &args_slice[0] {
3277                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3278                _ => {
3279                    return Err(Error::InvalidArgumentError(
3280                        "MIN requires a column argument".into(),
3281                    ));
3282                }
3283            };
3284            let column = resolve_column_name(arg_expr)?;
3285            llkv_expr::expr::AggregateCall::Min(column)
3286        }
3287        "max" => {
3288            if args_slice.len() != 1 {
3289                return Err(Error::InvalidArgumentError(
3290                    "MAX accepts exactly one argument".into(),
3291                ));
3292            }
3293            let arg_expr = match &args_slice[0] {
3294                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3295                _ => {
3296                    return Err(Error::InvalidArgumentError(
3297                        "MAX requires a column argument".into(),
3298                    ));
3299                }
3300            };
3301            let column = resolve_column_name(arg_expr)?;
3302            llkv_expr::expr::AggregateCall::Max(column)
3303        }
3304        _ => return Ok(None),
3305    };
3306
3307    Ok(Some(agg_call))
3308}
3309
3310fn parse_count_nulls_case(expr: &SqlExpr) -> SqlResult<Option<String>> {
3311    let SqlExpr::Case {
3312        operand,
3313        conditions,
3314        else_result,
3315        ..
3316    } = expr
3317    else {
3318        return Ok(None);
3319    };
3320
3321    if operand.is_some() || conditions.len() != 1 {
3322        return Ok(None);
3323    }
3324
3325    let case_when = &conditions[0];
3326    if !is_integer_literal(&case_when.result, 1) {
3327        return Ok(None);
3328    }
3329
3330    let else_expr = match else_result {
3331        Some(expr) => expr.as_ref(),
3332        None => return Ok(None),
3333    };
3334    if !is_integer_literal(else_expr, 0) {
3335        return Ok(None);
3336    }
3337
3338    let inner = match &case_when.condition {
3339        SqlExpr::IsNull(inner) => inner.as_ref(),
3340        _ => return Ok(None),
3341    };
3342
3343    resolve_column_name(inner).map(Some)
3344}
3345
3346fn is_integer_literal(expr: &SqlExpr, expected: i64) -> bool {
3347    match expr {
3348        SqlExpr::Value(ValueWithSpan {
3349            value: Value::Number(text, _),
3350            ..
3351        }) => text.parse::<i64>() == Ok(expected),
3352        _ => false,
3353    }
3354}
3355
3356fn translate_condition_with_context(
3357    resolver: &IdentifierResolver<'_>,
3358    context: IdentifierContext,
3359    expr: &SqlExpr,
3360) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
3361    match expr {
3362        SqlExpr::IsNull(inner) => {
3363            let scalar = translate_scalar_with_context(resolver, context, inner)?;
3364            match scalar {
3365                llkv_expr::expr::ScalarExpr::Column(column) => {
3366                    Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3367                        field_id: column,
3368                        op: llkv_expr::expr::Operator::IsNull,
3369                    }))
3370                }
3371                _ => Err(Error::InvalidArgumentError(
3372                    "IS NULL predicates currently support column references only".into(),
3373                )),
3374            }
3375        }
3376        SqlExpr::IsNotNull(inner) => {
3377            let scalar = translate_scalar_with_context(resolver, context, inner)?;
3378            match scalar {
3379                llkv_expr::expr::ScalarExpr::Column(column) => {
3380                    Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3381                        field_id: column,
3382                        op: llkv_expr::expr::Operator::IsNotNull,
3383                    }))
3384                }
3385                _ => Err(Error::InvalidArgumentError(
3386                    "IS NOT NULL predicates currently support column references only".into(),
3387                )),
3388            }
3389        }
3390        SqlExpr::BinaryOp { left, op, right } => match op {
3391            BinaryOperator::And => Ok(llkv_expr::expr::Expr::And(vec![
3392                translate_condition_with_context(resolver, context, left)?,
3393                translate_condition_with_context(resolver, context, right)?,
3394            ])),
3395            BinaryOperator::Or => Ok(llkv_expr::expr::Expr::Or(vec![
3396                translate_condition_with_context(resolver, context, left)?,
3397                translate_condition_with_context(resolver, context, right)?,
3398            ])),
3399            BinaryOperator::Eq
3400            | BinaryOperator::NotEq
3401            | BinaryOperator::Lt
3402            | BinaryOperator::LtEq
3403            | BinaryOperator::Gt
3404            | BinaryOperator::GtEq => {
3405                translate_comparison_with_context(resolver, context, left, op.clone(), right)
3406            }
3407            other => Err(Error::InvalidArgumentError(format!(
3408                "unsupported binary operator in WHERE clause: {other:?}"
3409            ))),
3410        },
3411        SqlExpr::UnaryOp {
3412            op: UnaryOperator::Not,
3413            expr,
3414        } => Ok(llkv_expr::expr::Expr::not(
3415            translate_condition_with_context(resolver, context, expr)?,
3416        )),
3417        SqlExpr::Nested(inner) => translate_condition_with_context(resolver, context, inner),
3418        other => Err(Error::InvalidArgumentError(format!(
3419            "unsupported WHERE clause: {other:?}"
3420        ))),
3421    }
3422}
3423
3424fn translate_comparison_with_context(
3425    resolver: &IdentifierResolver<'_>,
3426    context: IdentifierContext,
3427    left: &SqlExpr,
3428    op: BinaryOperator,
3429    right: &SqlExpr,
3430) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
3431    let left_scalar = translate_scalar_with_context(resolver, context, left)?;
3432    let right_scalar = translate_scalar_with_context(resolver, context, right)?;
3433    let compare_op = match op {
3434        BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
3435        BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
3436        BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
3437        BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
3438        BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
3439        BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
3440        other => {
3441            return Err(Error::InvalidArgumentError(format!(
3442                "unsupported comparison operator: {other:?}"
3443            )));
3444        }
3445    };
3446
3447    if let (
3448        llkv_expr::expr::ScalarExpr::Column(column),
3449        llkv_expr::expr::ScalarExpr::Literal(literal),
3450    ) = (&left_scalar, &right_scalar)
3451        && let Some(op) = compare_op_to_filter_operator(compare_op, literal)
3452    {
3453        return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3454            field_id: column.clone(),
3455            op,
3456        }));
3457    }
3458
3459    if let (
3460        llkv_expr::expr::ScalarExpr::Literal(literal),
3461        llkv_expr::expr::ScalarExpr::Column(column),
3462    ) = (&left_scalar, &right_scalar)
3463        && let Some(flipped) = flip_compare_op(compare_op)
3464        && let Some(op) = compare_op_to_filter_operator(flipped, literal)
3465    {
3466        return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3467            field_id: column.clone(),
3468            op,
3469        }));
3470    }
3471
3472    Ok(llkv_expr::expr::Expr::Compare {
3473        left: left_scalar,
3474        op: compare_op,
3475        right: right_scalar,
3476    })
3477}
3478
3479fn compare_op_to_filter_operator(
3480    op: llkv_expr::expr::CompareOp,
3481    literal: &Literal,
3482) -> Option<llkv_expr::expr::Operator<'static>> {
3483    let lit = literal.clone();
3484    match op {
3485        llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::Operator::Equals(lit)),
3486        llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::Operator::LessThan(lit)),
3487        llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::Operator::LessThanOrEquals(lit)),
3488        llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::Operator::GreaterThan(lit)),
3489        llkv_expr::expr::CompareOp::GtEq => {
3490            Some(llkv_expr::expr::Operator::GreaterThanOrEquals(lit))
3491        }
3492        llkv_expr::expr::CompareOp::NotEq => None,
3493    }
3494}
3495
3496fn flip_compare_op(op: llkv_expr::expr::CompareOp) -> Option<llkv_expr::expr::CompareOp> {
3497    match op {
3498        llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::CompareOp::Eq),
3499        llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::CompareOp::Gt),
3500        llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::CompareOp::GtEq),
3501        llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::CompareOp::Lt),
3502        llkv_expr::expr::CompareOp::GtEq => Some(llkv_expr::expr::CompareOp::LtEq),
3503        llkv_expr::expr::CompareOp::NotEq => None,
3504    }
3505}
3506/// Translate scalar expression with knowledge of the FROM table context.
3507/// This allows us to properly distinguish schema.table.column from column.field.field.
3508fn translate_scalar_with_context(
3509    resolver: &IdentifierResolver<'_>,
3510    context: IdentifierContext,
3511    expr: &SqlExpr,
3512) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3513    match expr {
3514        SqlExpr::Identifier(ident) => {
3515            let parts = vec![ident.value.clone()];
3516            let resolution = resolver.resolve(&parts, context)?;
3517            Ok(resolution.into_scalar_expr())
3518        }
3519        SqlExpr::CompoundIdentifier(idents) => {
3520            if idents.is_empty() {
3521                return Err(Error::InvalidArgumentError(
3522                    "invalid compound identifier".into(),
3523                ));
3524            }
3525
3526            let parts: Vec<String> = idents.iter().map(|ident| ident.value.clone()).collect();
3527            let resolution = resolver.resolve(&parts, context)?;
3528            Ok(resolution.into_scalar_expr())
3529        }
3530        _ => translate_scalar(expr),
3531    }
3532}
3533
3534fn translate_scalar(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3535    match expr {
3536        SqlExpr::Identifier(ident) => Ok(llkv_expr::expr::ScalarExpr::column(ident.value.clone())),
3537        SqlExpr::CompoundIdentifier(idents) => {
3538            if idents.is_empty() {
3539                return Err(Error::InvalidArgumentError(
3540                    "invalid compound identifier".into(),
3541                ));
3542            }
3543
3544            // Without table context, treat first part as column, rest as fields
3545            let column_name = idents[0].value.clone();
3546            let mut result = llkv_expr::expr::ScalarExpr::column(column_name);
3547
3548            for part in &idents[1..] {
3549                let field_name = part.value.clone();
3550                result = llkv_expr::expr::ScalarExpr::get_field(result, field_name);
3551            }
3552
3553            Ok(result)
3554        }
3555        SqlExpr::Value(value) => literal_from_value(value),
3556        SqlExpr::BinaryOp { left, op, right } => {
3557            let left_expr = translate_scalar(left)?;
3558            let right_expr = translate_scalar(right)?;
3559            let op = match op {
3560                BinaryOperator::Plus => llkv_expr::expr::BinaryOp::Add,
3561                BinaryOperator::Minus => llkv_expr::expr::BinaryOp::Subtract,
3562                BinaryOperator::Multiply => llkv_expr::expr::BinaryOp::Multiply,
3563                BinaryOperator::Divide => llkv_expr::expr::BinaryOp::Divide,
3564                BinaryOperator::Modulo => llkv_expr::expr::BinaryOp::Modulo,
3565                other => {
3566                    return Err(Error::InvalidArgumentError(format!(
3567                        "unsupported scalar binary operator: {other:?}"
3568                    )));
3569                }
3570            };
3571            Ok(llkv_expr::expr::ScalarExpr::binary(
3572                left_expr, op, right_expr,
3573            ))
3574        }
3575        SqlExpr::UnaryOp {
3576            op: UnaryOperator::Minus,
3577            expr,
3578        } => match translate_scalar(expr)? {
3579            llkv_expr::expr::ScalarExpr::Literal(lit) => match lit {
3580                Literal::Integer(v) => {
3581                    Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(-v)))
3582                }
3583                Literal::Float(v) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(-v))),
3584                Literal::Boolean(_) => Err(Error::InvalidArgumentError(
3585                    "cannot negate boolean literal".into(),
3586                )),
3587                Literal::String(_) => Err(Error::InvalidArgumentError(
3588                    "cannot negate string literal".into(),
3589                )),
3590                Literal::Struct(_) => Err(Error::InvalidArgumentError(
3591                    "cannot negate struct literal".into(),
3592                )),
3593                Literal::Null => Err(Error::InvalidArgumentError(
3594                    "cannot negate null literal".into(),
3595                )),
3596            },
3597            _ => Err(Error::InvalidArgumentError(
3598                "cannot negate non-literal expression".into(),
3599            )),
3600        },
3601        SqlExpr::UnaryOp {
3602            op: UnaryOperator::Plus,
3603            expr,
3604        } => translate_scalar(expr),
3605        SqlExpr::Nested(inner) => translate_scalar(inner),
3606        SqlExpr::Function(func) => {
3607            // Try to parse as an aggregate function
3608            if let Some(agg_call) = try_parse_aggregate_function(func)? {
3609                Ok(llkv_expr::expr::ScalarExpr::aggregate(agg_call))
3610            } else {
3611                Err(Error::InvalidArgumentError(format!(
3612                    "unsupported function in scalar expression: {:?}",
3613                    func.name
3614                )))
3615            }
3616        }
3617        SqlExpr::Dictionary(fields) => {
3618            // Dictionary literal like {'a': 1, 'b': 2} represents a STRUCT literal
3619            let mut struct_fields = Vec::new();
3620            for entry in fields {
3621                let key = entry.key.value.clone(); // Use .value to get the actual identifier value
3622                // Parse the value to a literal
3623                let value_expr = translate_scalar(&entry.value)?;
3624                // Extract the literal from the expression
3625                match value_expr {
3626                    llkv_expr::expr::ScalarExpr::Literal(lit) => {
3627                        struct_fields.push((key, Box::new(lit)));
3628                    }
3629                    _ => {
3630                        return Err(Error::InvalidArgumentError(
3631                            "Dictionary values must be literals".to_string(),
3632                        ));
3633                    }
3634                }
3635            }
3636            Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Struct(
3637                struct_fields,
3638            )))
3639        }
3640        other => Err(Error::InvalidArgumentError(format!(
3641            "unsupported scalar expression: {other:?}"
3642        ))),
3643    }
3644}
3645
3646fn literal_from_value(value: &ValueWithSpan) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3647    match &value.value {
3648        Value::Number(text, _) => {
3649            if text.contains(['.', 'e', 'E']) {
3650                let parsed = text.parse::<f64>().map_err(|err| {
3651                    Error::InvalidArgumentError(format!("invalid float literal: {err}"))
3652                })?;
3653                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(parsed)))
3654            } else {
3655                let parsed = text.parse::<i128>().map_err(|err| {
3656                    Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
3657                })?;
3658                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(
3659                    parsed,
3660                )))
3661            }
3662        }
3663        Value::Boolean(value) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Boolean(
3664            *value,
3665        ))),
3666        Value::Null => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Null)),
3667        other => {
3668            if let Some(text) = other.clone().into_string() {
3669                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::String(text)))
3670            } else {
3671                Err(Error::InvalidArgumentError(format!(
3672                    "unsupported literal: {other:?}"
3673                )))
3674            }
3675        }
3676    }
3677}
3678
3679fn resolve_assignment_column_name(target: &AssignmentTarget) -> SqlResult<String> {
3680    match target {
3681        AssignmentTarget::ColumnName(name) => {
3682            if name.0.len() != 1 {
3683                return Err(Error::InvalidArgumentError(
3684                    "qualified column names in UPDATE assignments are not supported yet".into(),
3685                ));
3686            }
3687            match &name.0[0] {
3688                ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
3689                other => Err(Error::InvalidArgumentError(format!(
3690                    "unsupported column reference in UPDATE assignment: {other:?}"
3691                ))),
3692            }
3693        }
3694        AssignmentTarget::Tuple(_) => Err(Error::InvalidArgumentError(
3695            "tuple assignments are not supported yet".into(),
3696        )),
3697    }
3698}
3699
3700fn arrow_type_from_sql(data_type: &SqlDataType) -> SqlResult<arrow::datatypes::DataType> {
3701    use arrow::datatypes::DataType;
3702    match data_type {
3703        SqlDataType::Int(_)
3704        | SqlDataType::Integer(_)
3705        | SqlDataType::BigInt(_)
3706        | SqlDataType::SmallInt(_)
3707        | SqlDataType::TinyInt(_) => Ok(DataType::Int64),
3708        SqlDataType::Float(_)
3709        | SqlDataType::Real
3710        | SqlDataType::Double(_)
3711        | SqlDataType::DoublePrecision => Ok(DataType::Float64),
3712        SqlDataType::Text
3713        | SqlDataType::String(_)
3714        | SqlDataType::Varchar(_)
3715        | SqlDataType::Char(_)
3716        | SqlDataType::Uuid => Ok(DataType::Utf8),
3717        SqlDataType::Date => Ok(DataType::Date32),
3718        SqlDataType::Decimal(_) | SqlDataType::Numeric(_) => Ok(DataType::Float64),
3719        SqlDataType::Boolean => Ok(DataType::Boolean),
3720        SqlDataType::Custom(name, args) => {
3721            if name.0.len() == 1
3722                && let ObjectNamePart::Identifier(ident) = &name.0[0]
3723                && ident.value.eq_ignore_ascii_case("row")
3724            {
3725                return row_type_to_arrow(data_type, args);
3726            }
3727            Err(Error::InvalidArgumentError(format!(
3728                "unsupported SQL data type: {data_type:?}"
3729            )))
3730        }
3731        other => Err(Error::InvalidArgumentError(format!(
3732            "unsupported SQL data type: {other:?}"
3733        ))),
3734    }
3735}
3736
3737fn row_type_to_arrow(
3738    data_type: &SqlDataType,
3739    tokens: &[String],
3740) -> SqlResult<arrow::datatypes::DataType> {
3741    use arrow::datatypes::{DataType, Field, FieldRef, Fields};
3742
3743    let row_str = data_type.to_string();
3744    if tokens.is_empty() {
3745        return Err(Error::InvalidArgumentError(
3746            "ROW type must define at least one field".into(),
3747        ));
3748    }
3749
3750    let dialect = GenericDialect {};
3751    let field_definitions = resolve_row_field_types(tokens, &dialect).map_err(|err| {
3752        Error::InvalidArgumentError(format!("unable to parse ROW type '{row_str}': {err}"))
3753    })?;
3754
3755    let mut fields: Vec<FieldRef> = Vec::with_capacity(field_definitions.len());
3756    for (field_name, field_type) in field_definitions {
3757        let arrow_field_type = arrow_type_from_sql(&field_type)?;
3758        fields.push(Arc::new(Field::new(field_name, arrow_field_type, true)));
3759    }
3760
3761    let struct_fields: Fields = fields.into();
3762    Ok(DataType::Struct(struct_fields))
3763}
3764
3765fn resolve_row_field_types(
3766    tokens: &[String],
3767    dialect: &GenericDialect,
3768) -> SqlResult<Vec<(String, SqlDataType)>> {
3769    if tokens.is_empty() {
3770        return Err(Error::InvalidArgumentError(
3771            "ROW type must define at least one field".into(),
3772        ));
3773    }
3774
3775    let mut start = 0;
3776    let mut end = tokens.len();
3777    if tokens[start] == "(" {
3778        if end == 0 || tokens[end - 1] != ")" {
3779            return Err(Error::InvalidArgumentError(
3780                "ROW type is missing closing ')'".into(),
3781            ));
3782        }
3783        start += 1;
3784        end -= 1;
3785    } else if tokens[end - 1] == ")" {
3786        return Err(Error::InvalidArgumentError(
3787            "ROW type contains unmatched ')'".into(),
3788        ));
3789    }
3790
3791    let slice = &tokens[start..end];
3792    if slice.is_empty() {
3793        return Err(Error::InvalidArgumentError(
3794            "ROW type did not provide any field definitions".into(),
3795        ));
3796    }
3797
3798    let mut fields = Vec::new();
3799    let mut index = 0;
3800
3801    while index < slice.len() {
3802        if slice[index] == "," {
3803            index += 1;
3804            continue;
3805        }
3806
3807        let field_name = normalize_row_field_name(&slice[index])?;
3808        index += 1;
3809
3810        if index >= slice.len() {
3811            return Err(Error::InvalidArgumentError(format!(
3812                "ROW field '{field_name}' is missing a type specification"
3813            )));
3814        }
3815
3816        let mut last_success: Option<(usize, SqlDataType)> = None;
3817        let mut type_end = index;
3818
3819        while type_end <= slice.len() {
3820            let candidate = slice[index..type_end].join(" ");
3821            if candidate.trim().is_empty() {
3822                type_end += 1;
3823                continue;
3824            }
3825
3826            if let Ok(parsed_type) = parse_sql_data_type(&candidate, dialect) {
3827                last_success = Some((type_end, parsed_type));
3828            }
3829
3830            if type_end == slice.len() {
3831                break;
3832            }
3833
3834            if slice[type_end] == "," && last_success.is_some() {
3835                break;
3836            }
3837
3838            type_end += 1;
3839        }
3840
3841        let Some((next_index, data_type)) = last_success else {
3842            return Err(Error::InvalidArgumentError(format!(
3843                "failed to parse ROW field type for '{field_name}'"
3844            )));
3845        };
3846
3847        fields.push((field_name, data_type));
3848        index = next_index;
3849
3850        if index < slice.len() && slice[index] == "," {
3851            index += 1;
3852        }
3853    }
3854
3855    if fields.is_empty() {
3856        return Err(Error::InvalidArgumentError(
3857            "ROW type did not provide any field definitions".into(),
3858        ));
3859    }
3860
3861    Ok(fields)
3862}
3863
3864fn normalize_row_field_name(raw: &str) -> SqlResult<String> {
3865    let trimmed = raw.trim();
3866    if trimmed.is_empty() {
3867        return Err(Error::InvalidArgumentError(
3868            "ROW field name must not be empty".into(),
3869        ));
3870    }
3871
3872    if let Some(stripped) = trimmed.strip_prefix('"') {
3873        let without_end = stripped.strip_suffix('"').ok_or_else(|| {
3874            Error::InvalidArgumentError(format!("unterminated quoted ROW field name: {trimmed}"))
3875        })?;
3876        let name = without_end.replace("\"\"", "\"");
3877        return Ok(name);
3878    }
3879
3880    Ok(trimmed.to_string())
3881}
3882
3883fn parse_sql_data_type(type_str: &str, dialect: &GenericDialect) -> SqlResult<SqlDataType> {
3884    let trimmed = type_str.trim();
3885    let sql = format!("CREATE TABLE __row(__field {trimmed});");
3886    let statements = Parser::parse_sql(dialect, &sql).map_err(|err| {
3887        Error::InvalidArgumentError(format!("failed to parse ROW field type '{trimmed}': {err}"))
3888    })?;
3889
3890    let stmt = statements.into_iter().next().ok_or_else(|| {
3891        Error::InvalidArgumentError(format!(
3892            "ROW field type '{trimmed}' did not produce a statement"
3893        ))
3894    })?;
3895
3896    match stmt {
3897        Statement::CreateTable(table) => table
3898            .columns
3899            .first()
3900            .map(|col| col.data_type.clone())
3901            .ok_or_else(|| {
3902                Error::InvalidArgumentError(format!(
3903                    "ROW field type '{trimmed}' missing column definition"
3904                ))
3905            }),
3906        other => Err(Error::InvalidArgumentError(format!(
3907            "unexpected statement while parsing ROW field type: {other:?}"
3908        ))),
3909    }
3910}
3911
3912fn extract_constant_select_rows(select: &Select) -> SqlResult<Option<Vec<Vec<PlanValue>>>> {
3913    if !select.from.is_empty() {
3914        return Ok(None);
3915    }
3916
3917    if select.selection.is_some()
3918        || select.having.is_some()
3919        || !select.named_window.is_empty()
3920        || select.qualify.is_some()
3921        || select.distinct.is_some()
3922        || select.top.is_some()
3923        || select.into.is_some()
3924        || select.prewhere.is_some()
3925        || !select.lateral_views.is_empty()
3926        || select.value_table_mode.is_some()
3927        || !group_by_is_empty(&select.group_by)
3928    {
3929        return Err(Error::InvalidArgumentError(
3930            "constant SELECT statements do not support advanced clauses".into(),
3931        ));
3932    }
3933
3934    if select.projection.is_empty() {
3935        return Err(Error::InvalidArgumentError(
3936            "constant SELECT requires at least one projection".into(),
3937        ));
3938    }
3939
3940    let mut row: Vec<PlanValue> = Vec::with_capacity(select.projection.len());
3941    for item in &select.projection {
3942        let expr = match item {
3943            SelectItem::UnnamedExpr(expr) => expr,
3944            SelectItem::ExprWithAlias { expr, .. } => expr,
3945            other => {
3946                return Err(Error::InvalidArgumentError(format!(
3947                    "unsupported projection in constant SELECT: {other:?}"
3948                )));
3949            }
3950        };
3951
3952        let value = SqlValue::try_from_expr(expr)?;
3953        row.push(PlanValue::from(value));
3954    }
3955
3956    Ok(Some(vec![row]))
3957}
3958
3959fn extract_single_table(from: &[TableWithJoins]) -> SqlResult<(String, String)> {
3960    if from.len() != 1 {
3961        return Err(Error::InvalidArgumentError(
3962            "queries over multiple tables are not supported yet".into(),
3963        ));
3964    }
3965    let item = &from[0];
3966    if !item.joins.is_empty() {
3967        return Err(Error::InvalidArgumentError(
3968            "JOIN clauses are not supported yet".into(),
3969        ));
3970    }
3971    match &item.relation {
3972        TableFactor::Table { name, .. } => canonical_object_name(name),
3973        _ => Err(Error::InvalidArgumentError(
3974            "queries require a plain table name".into(),
3975        )),
3976    }
3977}
3978
3979/// Extract multiple tables from FROM clause for cross products
3980fn extract_tables(from: &[TableWithJoins]) -> SqlResult<Vec<llkv_plan::TableRef>> {
3981    let mut tables = Vec::new();
3982
3983    for item in from {
3984        // Check if this table has joins - we don't support JOIN yet
3985        if !item.joins.is_empty() {
3986            return Err(Error::InvalidArgumentError(
3987                "JOIN clauses are not supported yet".into(),
3988            ));
3989        }
3990
3991        // Extract table name
3992        match &item.relation {
3993            TableFactor::Table { name, .. } => {
3994                let (schema_opt, table) = parse_schema_qualified_name(name)?;
3995                let schema = schema_opt.unwrap_or_default();
3996                tables.push(llkv_plan::TableRef::new(schema, table));
3997            }
3998            _ => {
3999                return Err(Error::InvalidArgumentError(
4000                    "queries require a plain table name".into(),
4001                ));
4002            }
4003        }
4004    }
4005
4006    Ok(tables)
4007}
4008
4009fn group_by_is_empty(expr: &GroupByExpr) -> bool {
4010    matches!(
4011        expr,
4012        GroupByExpr::Expressions(exprs, modifiers)
4013            if exprs.is_empty() && modifiers.is_empty()
4014    )
4015}
4016
4017#[cfg(test)]
4018mod tests {
4019    use super::*;
4020    use arrow::array::{Array, Int64Array, StringArray};
4021    use arrow::record_batch::RecordBatch;
4022    use llkv_storage::pager::MemPager;
4023
4024    fn extract_string_options(batches: &[RecordBatch]) -> Vec<Option<String>> {
4025        let mut values: Vec<Option<String>> = Vec::new();
4026        for batch in batches {
4027            let column = batch
4028                .column(0)
4029                .as_any()
4030                .downcast_ref::<StringArray>()
4031                .expect("string column");
4032            for idx in 0..column.len() {
4033                if column.is_null(idx) {
4034                    values.push(None);
4035                } else {
4036                    values.push(Some(column.value(idx).to_string()));
4037                }
4038            }
4039        }
4040        values
4041    }
4042
4043    #[test]
4044    fn create_insert_select_roundtrip() {
4045        let pager = Arc::new(MemPager::default());
4046        let engine = SqlEngine::new(pager);
4047
4048        let result = engine
4049            .execute("CREATE TABLE people (id INT NOT NULL, name TEXT NOT NULL)")
4050            .expect("create table");
4051        assert!(matches!(
4052            result[0],
4053            RuntimeStatementResult::CreateTable { .. }
4054        ));
4055
4056        let result = engine
4057            .execute("INSERT INTO people (id, name) VALUES (1, 'alice'), (2, 'bob')")
4058            .expect("insert rows");
4059        assert!(matches!(
4060            result[0],
4061            RuntimeStatementResult::Insert {
4062                rows_inserted: 2,
4063                ..
4064            }
4065        ));
4066
4067        let mut result = engine
4068            .execute("SELECT name FROM people WHERE id = 2")
4069            .expect("select rows");
4070        let select_result = result.remove(0);
4071        let batches = match select_result {
4072            RuntimeStatementResult::Select { execution, .. } => {
4073                execution.collect().expect("collect batches")
4074            }
4075            _ => panic!("expected select result"),
4076        };
4077        assert_eq!(batches.len(), 1);
4078        let column = batches[0]
4079            .column(0)
4080            .as_any()
4081            .downcast_ref::<StringArray>()
4082            .expect("string column");
4083        assert_eq!(column.len(), 1);
4084        assert_eq!(column.value(0), "bob");
4085    }
4086
4087    #[test]
4088    fn insert_select_constant_including_null() {
4089        let pager = Arc::new(MemPager::default());
4090        let engine = SqlEngine::new(pager);
4091
4092        engine
4093            .execute("CREATE TABLE integers(i INTEGER)")
4094            .expect("create table");
4095
4096        let result = engine
4097            .execute("INSERT INTO integers SELECT 42")
4098            .expect("insert literal");
4099        assert!(matches!(
4100            result[0],
4101            RuntimeStatementResult::Insert {
4102                rows_inserted: 1,
4103                ..
4104            }
4105        ));
4106
4107        let result = engine
4108            .execute("INSERT INTO integers SELECT CAST(NULL AS VARCHAR)")
4109            .expect("insert null literal");
4110        assert!(matches!(
4111            result[0],
4112            RuntimeStatementResult::Insert {
4113                rows_inserted: 1,
4114                ..
4115            }
4116        ));
4117
4118        let mut result = engine
4119            .execute("SELECT * FROM integers")
4120            .expect("select rows");
4121        let select_result = result.remove(0);
4122        let batches = match select_result {
4123            RuntimeStatementResult::Select { execution, .. } => {
4124                execution.collect().expect("collect batches")
4125            }
4126            _ => panic!("expected select result"),
4127        };
4128
4129        let mut values: Vec<Option<i64>> = Vec::new();
4130        for batch in &batches {
4131            let column = batch
4132                .column(0)
4133                .as_any()
4134                .downcast_ref::<Int64Array>()
4135                .expect("int column");
4136            for idx in 0..column.len() {
4137                if column.is_null(idx) {
4138                    values.push(None);
4139                } else {
4140                    values.push(Some(column.value(idx)));
4141                }
4142            }
4143        }
4144
4145        assert_eq!(values, vec![Some(42), None]);
4146    }
4147
4148    #[test]
4149    fn update_with_where_clause_filters_rows() {
4150        let pager = Arc::new(MemPager::default());
4151        let engine = SqlEngine::new(pager);
4152
4153        engine
4154            .execute("SET default_null_order='nulls_first'")
4155            .expect("set default null order");
4156
4157        engine
4158            .execute("CREATE TABLE strings(a VARCHAR)")
4159            .expect("create table");
4160
4161        engine
4162            .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
4163            .expect("insert seed rows");
4164
4165        let result = engine
4166            .execute("UPDATE strings SET a = 13 WHERE a = '3'")
4167            .expect("update rows");
4168        assert!(matches!(
4169            result[0],
4170            RuntimeStatementResult::Update {
4171                rows_updated: 1,
4172                ..
4173            }
4174        ));
4175
4176        let mut result = engine
4177            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4178            .expect("select rows");
4179        let select_result = result.remove(0);
4180        let batches = match select_result {
4181            RuntimeStatementResult::Select { execution, .. } => {
4182                execution.collect().expect("collect batches")
4183            }
4184            _ => panic!("expected select result"),
4185        };
4186
4187        let mut values: Vec<Option<String>> = Vec::new();
4188        for batch in &batches {
4189            let column = batch
4190                .column(0)
4191                .as_any()
4192                .downcast_ref::<StringArray>()
4193                .expect("string column");
4194            for idx in 0..column.len() {
4195                if column.is_null(idx) {
4196                    values.push(None);
4197                } else {
4198                    values.push(Some(column.value(idx).to_string()));
4199                }
4200            }
4201        }
4202
4203        values.sort_by(|a, b| match (a, b) {
4204            (None, None) => std::cmp::Ordering::Equal,
4205            (None, Some(_)) => std::cmp::Ordering::Less,
4206            (Some(_), None) => std::cmp::Ordering::Greater,
4207            (Some(av), Some(bv)) => {
4208                let a_val = av.parse::<i64>().unwrap_or_default();
4209                let b_val = bv.parse::<i64>().unwrap_or_default();
4210                a_val.cmp(&b_val)
4211            }
4212        });
4213
4214        assert_eq!(
4215            values,
4216            vec![None, Some("4".to_string()), Some("13".to_string())]
4217        );
4218    }
4219
4220    #[test]
4221    fn order_by_honors_configured_default_null_order() {
4222        let pager = Arc::new(MemPager::default());
4223        let engine = SqlEngine::new(pager);
4224
4225        engine
4226            .execute("CREATE TABLE strings(a VARCHAR)")
4227            .expect("create table");
4228        engine
4229            .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
4230            .expect("insert values");
4231        engine
4232            .execute("UPDATE strings SET a = 13 WHERE a = '3'")
4233            .expect("update value");
4234
4235        let mut result = engine
4236            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4237            .expect("select rows");
4238        let select_result = result.remove(0);
4239        let batches = match select_result {
4240            RuntimeStatementResult::Select { execution, .. } => {
4241                execution.collect().expect("collect batches")
4242            }
4243            _ => panic!("expected select result"),
4244        };
4245
4246        let values = extract_string_options(&batches);
4247        assert_eq!(
4248            values,
4249            vec![Some("4".to_string()), Some("13".to_string()), None]
4250        );
4251
4252        assert!(!engine.default_nulls_first_for_tests());
4253
4254        engine
4255            .execute("SET default_null_order='nulls_first'")
4256            .expect("set default null order");
4257
4258        assert!(engine.default_nulls_first_for_tests());
4259
4260        let mut result = engine
4261            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4262            .expect("select rows");
4263        let select_result = result.remove(0);
4264        let batches = match select_result {
4265            RuntimeStatementResult::Select { execution, .. } => {
4266                execution.collect().expect("collect batches")
4267            }
4268            _ => panic!("expected select result"),
4269        };
4270
4271        let values = extract_string_options(&batches);
4272        assert_eq!(
4273            values,
4274            vec![None, Some("4".to_string()), Some("13".to_string())]
4275        );
4276    }
4277
4278    #[test]
4279    fn arrow_type_from_row_returns_struct_fields() {
4280        let dialect = GenericDialect {};
4281        let statements = Parser::parse_sql(
4282            &dialect,
4283            "CREATE TABLE row_types(payload ROW(a INTEGER, b VARCHAR));",
4284        )
4285        .expect("parse ROW type definition");
4286
4287        let data_type = match &statements[0] {
4288            Statement::CreateTable(stmt) => stmt.columns[0].data_type.clone(),
4289            other => panic!("unexpected statement: {other:?}"),
4290        };
4291
4292        let arrow_type = arrow_type_from_sql(&data_type).expect("convert ROW type");
4293        match arrow_type {
4294            arrow::datatypes::DataType::Struct(fields) => {
4295                assert_eq!(fields.len(), 2, "unexpected field count");
4296                assert_eq!(fields[0].name(), "a");
4297                assert_eq!(fields[1].name(), "b");
4298                assert_eq!(fields[0].data_type(), &arrow::datatypes::DataType::Int64);
4299                assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
4300            }
4301            other => panic!("expected struct type, got {other:?}"),
4302        }
4303    }
4304}