llkv_sql/
sql_engine.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::{
3    Arc, OnceLock,
4    atomic::{AtomicBool, Ordering as AtomicOrdering},
5};
6
7use crate::SqlResult;
8use crate::SqlValue;
9use arrow::record_batch::RecordBatch;
10
11use llkv_executor::SelectExecution;
12use llkv_expr::literal::Literal;
13use llkv_plan::TransformFrame;
14use llkv_plan::validation::{
15    ensure_known_columns_case_insensitive, ensure_non_empty, ensure_unique_case_insensitive,
16};
17use llkv_result::Error;
18use llkv_runtime::TEMPORARY_NAMESPACE_ID;
19use llkv_runtime::{
20    AggregateExpr, AssignmentValue, ColumnAssignment, CreateIndexPlan, CreateTablePlan,
21    CreateTableSource, DeletePlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertPlan,
22    InsertSource, MultiColumnUniqueSpec, OrderByPlan, OrderSortType, OrderTarget, PlanColumnSpec,
23    PlanStatement, PlanValue, RenameTablePlan, RuntimeContext, RuntimeEngine, RuntimeSession,
24    RuntimeStatementResult, SelectPlan, SelectProjection, TruncatePlan, UpdatePlan,
25    extract_rows_from_range,
26};
27use llkv_storage::pager::Pager;
28use llkv_table::CatalogDdl;
29use llkv_table::catalog::{IdentifierContext, IdentifierResolver};
30use regex::Regex;
31use simd_r_drive_entry_handle::EntryHandle;
32use sqlparser::ast::{
33    AlterColumnOperation, AlterTableOperation, Assignment, AssignmentTarget, BeginTransactionKind,
34    BinaryOperator, ColumnOption, ColumnOptionDef, ConstraintCharacteristics,
35    DataType as SqlDataType, Delete, Distinct, ExceptionWhen, Expr as SqlExpr, FromTable,
36    FunctionArg, FunctionArgExpr, FunctionArguments, GroupByExpr, Ident, JoinConstraint,
37    JoinOperator, LimitClause, NullsDistinctOption, ObjectName, ObjectNamePart, ObjectType,
38    OrderBy, OrderByKind, Query, ReferentialAction, SchemaName, Select, SelectItem,
39    SelectItemQualifiedWildcardKind, Set, SetExpr, SqlOption, Statement, TableConstraint,
40    TableFactor, TableObject, TableWithJoins, TransactionMode, TransactionModifier, UnaryOperator,
41    UpdateTableFromKind, Value, ValueWithSpan,
42};
43use sqlparser::dialect::GenericDialect;
44use sqlparser::parser::Parser;
45use sqlparser::tokenizer::Span;
46
47// TODO: Extract to constants.rs
48// TODO: Rename to SQL_PARSER_RECURSION_LIMIT
49/// Maximum recursion depth for SQL parser.
50///
51/// The default in sqlparser is 50, which can be exceeded by deeply nested queries
52/// (e.g., SQLite test suite). This value allows for more complex expressions while
53/// still preventing stack overflows.
54const PARSER_RECURSION_LIMIT: usize = 200;
55
56/// SQL execution engine built on top of the LLKV runtime.
57///
58/// # Examples
59///
60/// ```
61/// use std::sync::Arc;
62///
63/// use arrow::array::StringArray;
64/// use llkv_sql::{RuntimeStatementResult, SqlEngine};
65/// use llkv_storage::pager::MemPager;
66///
67/// let engine = SqlEngine::new(Arc::new(MemPager::default()));
68///
69/// let setup = r#"
70///     CREATE TABLE users (id INT PRIMARY KEY, name TEXT);
71///     INSERT INTO users (id, name) VALUES (1, 'Ada');
72/// "#;
73///
74/// let results = engine.execute(setup).unwrap();
75/// assert_eq!(results.len(), 2);
76///
77/// assert!(matches!(
78///     results[0],
79///     RuntimeStatementResult::CreateTable { ref table_name } if table_name == "users"
80/// ));
81/// assert!(matches!(
82///     results[1],
83///     RuntimeStatementResult::Insert { rows_inserted, .. } if rows_inserted == 1
84/// ));
85///
86/// let batches = engine.sql("SELECT id, name FROM users ORDER BY id;").unwrap();
87/// assert_eq!(batches.len(), 1);
88///
89/// let batch = &batches[0];
90/// assert_eq!(batch.num_rows(), 1);
91/// assert_eq!(batch.schema().field(1).name(), "name");
92///
93/// let names = batch
94///     .column(1)
95///     .as_any()
96///     .downcast_ref::<StringArray>()
97///     .unwrap();
98///
99/// assert_eq!(names.value(0), "Ada");
100/// ```
101pub struct SqlEngine<P>
102where
103    P: Pager<Blob = EntryHandle> + Send + Sync,
104{
105    engine: RuntimeEngine<P>,
106    default_nulls_first: AtomicBool,
107}
108
109const DROPPED_TABLE_TRANSACTION_ERR: &str = "another transaction has dropped this table";
110
111impl<P> Clone for SqlEngine<P>
112where
113    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
114{
115    fn clone(&self) -> Self {
116        tracing::warn!(
117            "[SQL_ENGINE] SqlEngine::clone() called - will create new Engine with new session!"
118        );
119        // Create a new session from the same context
120        Self {
121            engine: self.engine.clone(),
122            default_nulls_first: AtomicBool::new(
123                self.default_nulls_first.load(AtomicOrdering::Relaxed),
124            ),
125        }
126    }
127}
128
129#[allow(dead_code)]
130impl<P> SqlEngine<P>
131where
132    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
133{
134    fn map_table_error(table_name: &str, err: Error) -> Error {
135        match err {
136            Error::NotFound => Self::table_not_found_error(table_name),
137            Error::InvalidArgumentError(msg) if msg.contains("unknown table") => {
138                Self::table_not_found_error(table_name)
139            }
140            other => other,
141        }
142    }
143
144    fn table_not_found_error(table_name: &str) -> Error {
145        Error::CatalogError(format!(
146            "Catalog Error: Table '{table_name}' does not exist"
147        ))
148    }
149
150    fn is_table_missing_error(err: &Error) -> bool {
151        match err {
152            Error::NotFound => true,
153            Error::CatalogError(msg) => {
154                msg.contains("Catalog Error: Table") || msg.contains("unknown table")
155            }
156            Error::InvalidArgumentError(msg) => {
157                msg.contains("Catalog Error: Table") || msg.contains("unknown table")
158            }
159            _ => false,
160        }
161    }
162
163    fn execute_plan_statement(
164        &self,
165        statement: PlanStatement,
166    ) -> SqlResult<RuntimeStatementResult<P>> {
167        let table = llkv_runtime::statement_table_name(&statement).map(str::to_string);
168        self.engine.execute_statement(statement).map_err(|err| {
169            if let Some(table_name) = table {
170                Self::map_table_error(&table_name, err)
171            } else {
172                err
173            }
174        })
175    }
176
177    pub fn new(pager: Arc<P>) -> Self {
178        let engine = RuntimeEngine::new(pager);
179        Self {
180            engine,
181            default_nulls_first: AtomicBool::new(false),
182        }
183    }
184
185    /// Preprocess SQL to handle qualified names in EXCLUDE clauses
186    /// Converts EXCLUDE (schema.table.col) to EXCLUDE ("schema.table.col")
187    /// Preprocess CREATE TYPE to CREATE DOMAIN for sqlparser compatibility.
188    ///
189    /// DuckDB uses `CREATE TYPE name AS basetype` for type aliases, but sqlparser
190    /// only supports the SQL standard `CREATE DOMAIN name AS basetype` syntax.
191    /// This method converts the DuckDB syntax to the standard syntax.
192    fn preprocess_create_type_syntax(sql: &str) -> String {
193        static CREATE_TYPE_REGEX: OnceLock<Regex> = OnceLock::new();
194        static DROP_TYPE_REGEX: OnceLock<Regex> = OnceLock::new();
195
196        // Match: CREATE TYPE name AS datatype
197        let create_re = CREATE_TYPE_REGEX.get_or_init(|| {
198            Regex::new(r"(?i)\bCREATE\s+TYPE\s+").expect("valid CREATE TYPE regex")
199        });
200
201        // Match: DROP TYPE [IF EXISTS] name
202        let drop_re = DROP_TYPE_REGEX
203            .get_or_init(|| Regex::new(r"(?i)\bDROP\s+TYPE\s+").expect("valid DROP TYPE regex"));
204
205        // First replace CREATE TYPE with CREATE DOMAIN
206        let sql = create_re.replace_all(sql, "CREATE DOMAIN ").to_string();
207
208        // Then replace DROP TYPE with DROP DOMAIN
209        drop_re.replace_all(&sql, "DROP DOMAIN ").to_string()
210    }
211
212    fn preprocess_exclude_syntax(sql: &str) -> String {
213        static EXCLUDE_REGEX: OnceLock<Regex> = OnceLock::new();
214
215        // Pattern to match EXCLUDE followed by qualified identifiers
216        // Matches: EXCLUDE (identifier.identifier.identifier)
217        let re = EXCLUDE_REGEX.get_or_init(|| {
218            Regex::new(
219                r"(?i)EXCLUDE\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)+)\s*\)",
220            )
221            .expect("valid EXCLUDE qualifier regex")
222        });
223
224        re.replace_all(sql, |caps: &regex::Captures| {
225            let qualified_name = &caps[1];
226            format!("EXCLUDE (\"{}\")", qualified_name)
227        })
228        .to_string()
229    }
230
231    /// Preprocess SQL to remove trailing commas in VALUES clauses.
232    /// DuckDB allows trailing commas like VALUES ('v2',) but sqlparser does not.
233    fn preprocess_trailing_commas_in_values(sql: &str) -> String {
234        static TRAILING_COMMA_REGEX: OnceLock<Regex> = OnceLock::new();
235
236        // Pattern to match trailing comma before closing paren in VALUES
237        // Matches: , followed by optional whitespace and )
238        let re = TRAILING_COMMA_REGEX
239            .get_or_init(|| Regex::new(r",(\s*)\)").expect("valid trailing comma regex"));
240
241        re.replace_all(sql, "$1)").to_string()
242    }
243
244    pub(crate) fn context_arc(&self) -> Arc<RuntimeContext<P>> {
245        self.engine.context()
246    }
247
248    pub fn with_context(context: Arc<RuntimeContext<P>>, default_nulls_first: bool) -> Self {
249        Self {
250            engine: RuntimeEngine::from_context(context),
251            default_nulls_first: AtomicBool::new(default_nulls_first),
252        }
253    }
254
255    #[cfg(test)]
256    fn default_nulls_first_for_tests(&self) -> bool {
257        self.default_nulls_first.load(AtomicOrdering::Relaxed)
258    }
259
260    fn has_active_transaction(&self) -> bool {
261        self.engine.session().has_active_transaction()
262    }
263
264    /// Get a reference to the underlying session (for advanced use like error handling in test harnesses).
265    pub fn session(&self) -> &RuntimeSession<P> {
266        self.engine.session()
267    }
268
269    /// Execute one or more SQL statements and return their raw [`RuntimeStatementResult`]s.
270    ///
271    /// This method is the general-purpose entry point for running SQL against the engine when
272    /// you need to mix statement types (e.g. `CREATE TABLE`, `INSERT`, `UPDATE`, `SELECT`) or
273    /// when you care about per-statement status information. Statements are executed in the order
274    /// they appear in the input string, and the results vector mirrors that ordering.
275    ///
276    /// For ad-hoc read queries where you only care about the resulting Arrow [`RecordBatch`]es,
277    /// prefer [`SqlEngine::sql`], which enforces a single `SELECT` statement and collects its
278    /// output for you. `execute` remains the right tool for schema migrations, transactional
279    /// scripts, or workflows that need to inspect the specific runtime response for each
280    /// statement.
281    pub fn execute(&self, sql: &str) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
282        tracing::trace!("DEBUG SQL execute: {}", sql);
283
284        // Preprocess SQL to handle CREATE TYPE / DROP TYPE (DuckDB syntax)
285        let processed_sql = Self::preprocess_create_type_syntax(sql);
286
287        // Preprocess SQL to handle qualified names in EXCLUDE
288        // Replace EXCLUDE (schema.table.col) with EXCLUDE ("schema.table.col")
289        let processed_sql = Self::preprocess_exclude_syntax(&processed_sql);
290
291        // Preprocess SQL to remove trailing commas in VALUES clauses
292        // DuckDB allows VALUES ('v2',) but sqlparser does not
293        let processed_sql = Self::preprocess_trailing_commas_in_values(&processed_sql);
294
295        let dialect = GenericDialect {};
296        let statements = parse_sql_with_recursion_limit(&dialect, &processed_sql)
297            .map_err(|err| Error::InvalidArgumentError(format!("failed to parse SQL: {err}")))?;
298        tracing::trace!("DEBUG SQL execute: parsed {} statements", statements.len());
299
300        let mut results = Vec::with_capacity(statements.len());
301        for (i, statement) in statements.iter().enumerate() {
302            tracing::trace!("DEBUG SQL execute: processing statement {}", i);
303            results.push(self.execute_statement(statement.clone())?);
304            tracing::trace!("DEBUG SQL execute: statement {} completed", i);
305        }
306        tracing::trace!("DEBUG SQL execute completed successfully");
307        Ok(results)
308    }
309
310    /// Execute a single SELECT statement and return its results as Arrow [`RecordBatch`]es.
311    ///
312    /// The SQL passed to this method must contain exactly one statement, and that statement must
313    /// be a `SELECT`. Statements that modify data (e.g. `INSERT`) should be executed up front
314    /// using [`SqlEngine::execute`] before calling this helper.
315    ///
316    /// # Examples
317    ///
318    /// ```
319    /// use std::sync::Arc;
320    ///
321    /// use arrow::array::StringArray;
322    /// use llkv_sql::SqlEngine;
323    /// use llkv_storage::pager::MemPager;
324    ///
325    /// let engine = SqlEngine::new(Arc::new(MemPager::default()));
326    /// let _ = engine
327    ///     .execute(
328    ///         "CREATE TABLE users (id INT PRIMARY KEY, name TEXT);\n         \
329    ///          INSERT INTO users (id, name) VALUES (1, 'Ada');",
330    ///     )
331    ///     .unwrap();
332    ///
333    /// let batches = engine.sql("SELECT id, name FROM users ORDER BY id;").unwrap();
334    /// assert_eq!(batches.len(), 1);
335    ///
336    /// let batch = &batches[0];
337    /// assert_eq!(batch.num_rows(), 1);
338    ///
339    /// let names = batch
340    ///     .column(1)
341    ///     .as_any()
342    ///     .downcast_ref::<StringArray>()
343    ///     .unwrap();
344    /// assert_eq!(names.value(0), "Ada");
345    /// ```
346    pub fn sql(&self, sql: &str) -> SqlResult<Vec<RecordBatch>> {
347        let mut results = self.execute(sql)?;
348        if results.len() != 1 {
349            return Err(Error::InvalidArgumentError(
350                "SqlEngine::sql expects exactly one SQL statement".into(),
351            ));
352        }
353
354        match results.pop().expect("checked length above") {
355            RuntimeStatementResult::Select { execution, .. } => execution.collect(),
356            other => Err(Error::InvalidArgumentError(format!(
357                "SqlEngine::sql requires a SELECT statement, got {other:?}",
358            ))),
359        }
360    }
361
362    fn execute_statement(&self, statement: Statement) -> SqlResult<RuntimeStatementResult<P>> {
363        tracing::trace!(
364            "DEBUG SQL execute_statement: {:?}",
365            match &statement {
366                Statement::Insert(insert) =>
367                    format!("Insert(table={:?})", Self::table_name_from_insert(insert)),
368                Statement::Query(_) => "Query".to_string(),
369                Statement::StartTransaction { .. } => "StartTransaction".to_string(),
370                Statement::Commit { .. } => "Commit".to_string(),
371                Statement::Rollback { .. } => "Rollback".to_string(),
372                Statement::CreateTable(_) => "CreateTable".to_string(),
373                Statement::Update { .. } => "Update".to_string(),
374                Statement::Delete(_) => "Delete".to_string(),
375                other => format!("Other({:?})", other),
376            }
377        );
378        match statement {
379            Statement::StartTransaction {
380                modes,
381                begin,
382                transaction,
383                modifier,
384                statements,
385                exception,
386                has_end_keyword,
387            } => self.handle_start_transaction(
388                modes,
389                begin,
390                transaction,
391                modifier,
392                statements,
393                exception,
394                has_end_keyword,
395            ),
396            Statement::Commit {
397                chain,
398                end,
399                modifier,
400            } => self.handle_commit(chain, end, modifier),
401            Statement::Rollback { chain, savepoint } => self.handle_rollback(chain, savepoint),
402            other => self.execute_statement_non_transactional(other),
403        }
404    }
405
406    fn execute_statement_non_transactional(
407        &self,
408        statement: Statement,
409    ) -> SqlResult<RuntimeStatementResult<P>> {
410        tracing::trace!("DEBUG SQL execute_statement_non_transactional called");
411        match statement {
412            Statement::CreateTable(stmt) => {
413                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateTable");
414                self.handle_create_table(stmt)
415            }
416            Statement::CreateIndex(stmt) => {
417                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateIndex");
418                self.handle_create_index(stmt)
419            }
420            Statement::CreateSchema {
421                schema_name,
422                if_not_exists,
423                with,
424                options,
425                default_collate_spec,
426                clone,
427            } => {
428                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateSchema");
429                self.handle_create_schema(
430                    schema_name,
431                    if_not_exists,
432                    with,
433                    options,
434                    default_collate_spec,
435                    clone,
436                )
437            }
438            Statement::CreateView {
439                name,
440                columns,
441                query,
442                materialized,
443                or_replace,
444                or_alter,
445                options,
446                cluster_by,
447                comment,
448                with_no_schema_binding,
449                if_not_exists,
450                temporary,
451                to,
452                params,
453                secure,
454                name_before_not_exists,
455            } => {
456                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateView");
457                self.handle_create_view(
458                    name,
459                    columns,
460                    query,
461                    materialized,
462                    or_replace,
463                    or_alter,
464                    options,
465                    cluster_by,
466                    comment,
467                    with_no_schema_binding,
468                    if_not_exists,
469                    temporary,
470                    to,
471                    params,
472                    secure,
473                    name_before_not_exists,
474                )
475            }
476            Statement::CreateDomain(create_domain) => {
477                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateDomain");
478                self.handle_create_domain(create_domain)
479            }
480            Statement::DropDomain(drop_domain) => {
481                tracing::trace!("DEBUG SQL execute_statement_non_transactional: DropDomain");
482                self.handle_drop_domain(drop_domain)
483            }
484            Statement::Insert(stmt) => {
485                let table_name =
486                    Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
487                tracing::trace!(
488                    "DEBUG SQL execute_statement_non_transactional: Insert(table={})",
489                    table_name
490                );
491                self.handle_insert(stmt)
492            }
493            Statement::Query(query) => {
494                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Query");
495                self.handle_query(*query)
496            }
497            Statement::Update {
498                table,
499                assignments,
500                from,
501                selection,
502                returning,
503                ..
504            } => {
505                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Update");
506                self.handle_update(table, assignments, from, selection, returning)
507            }
508            Statement::Delete(delete) => {
509                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Delete");
510                self.handle_delete(delete)
511            }
512            Statement::Truncate {
513                ref table_names,
514                ref partitions,
515                table,
516                ref identity,
517                cascade,
518                ref on_cluster,
519            } => {
520                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Truncate");
521                self.handle_truncate(
522                    table_names,
523                    partitions,
524                    table,
525                    identity,
526                    cascade,
527                    on_cluster,
528                )
529            }
530            Statement::Drop {
531                object_type,
532                if_exists,
533                names,
534                cascade,
535                restrict,
536                purge,
537                temporary,
538                ..
539            } => {
540                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Drop");
541                self.handle_drop(
542                    object_type,
543                    if_exists,
544                    names,
545                    cascade,
546                    restrict,
547                    purge,
548                    temporary,
549                )
550            }
551            Statement::AlterTable {
552                name,
553                if_exists,
554                only,
555                operations,
556                ..
557            } => {
558                tracing::trace!("DEBUG SQL execute_statement_non_transactional: AlterTable");
559                self.handle_alter_table(name, if_exists, only, operations)
560            }
561            Statement::Set(set_stmt) => {
562                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Set");
563                self.handle_set(set_stmt)
564            }
565            Statement::Pragma { name, value, is_eq } => {
566                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Pragma");
567                self.handle_pragma(name, value, is_eq)
568            }
569            other => {
570                tracing::trace!(
571                    "DEBUG SQL execute_statement_non_transactional: Other({:?})",
572                    other
573                );
574                Err(Error::InvalidArgumentError(format!(
575                    "unsupported SQL statement: {other:?}"
576                )))
577            }
578        }
579    }
580
581    fn table_name_from_insert(insert: &sqlparser::ast::Insert) -> SqlResult<String> {
582        match &insert.table {
583            TableObject::TableName(name) => Self::object_name_to_string(name),
584            _ => Err(Error::InvalidArgumentError(
585                "INSERT requires a plain table name".into(),
586            )),
587        }
588    }
589
590    fn table_name_from_update(table: &TableWithJoins) -> SqlResult<Option<String>> {
591        if !table.joins.is_empty() {
592            return Err(Error::InvalidArgumentError(
593                "UPDATE with JOIN targets is not supported yet".into(),
594            ));
595        }
596        Self::table_with_joins_name(table)
597    }
598
599    fn table_name_from_delete(delete: &Delete) -> SqlResult<Option<String>> {
600        if !delete.tables.is_empty() {
601            return Err(Error::InvalidArgumentError(
602                "multi-table DELETE is not supported yet".into(),
603            ));
604        }
605        let from_tables = match &delete.from {
606            FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
607        };
608        if from_tables.is_empty() {
609            return Ok(None);
610        }
611        if from_tables.len() != 1 {
612            return Err(Error::InvalidArgumentError(
613                "DELETE over multiple tables is not supported yet".into(),
614            ));
615        }
616        Self::table_with_joins_name(&from_tables[0])
617    }
618
619    fn object_name_to_string(name: &ObjectName) -> SqlResult<String> {
620        let (display, _) = canonical_object_name(name)?;
621        Ok(display)
622    }
623
624    #[allow(dead_code)]
625    fn table_object_to_name(table: &TableObject) -> SqlResult<Option<String>> {
626        match table {
627            TableObject::TableName(name) => Ok(Some(Self::object_name_to_string(name)?)),
628            TableObject::TableFunction(_) => Ok(None),
629        }
630    }
631
632    fn table_with_joins_name(table: &TableWithJoins) -> SqlResult<Option<String>> {
633        match &table.relation {
634            TableFactor::Table { name, .. } => Ok(Some(Self::object_name_to_string(name)?)),
635            _ => Ok(None),
636        }
637    }
638
639    fn tables_in_query(query: &Query) -> SqlResult<Vec<String>> {
640        let mut tables = Vec::new();
641        if let sqlparser::ast::SetExpr::Select(select) = query.body.as_ref() {
642            for table in &select.from {
643                if let TableFactor::Table { name, .. } = &table.relation {
644                    tables.push(Self::object_name_to_string(name)?);
645                }
646            }
647        }
648        Ok(tables)
649    }
650
651    fn collect_known_columns(
652        &self,
653        display_name: &str,
654        canonical_name: &str,
655    ) -> SqlResult<HashSet<String>> {
656        let context = self.engine.context();
657
658        if context.is_table_marked_dropped(canonical_name) {
659            return Err(Self::table_not_found_error(display_name));
660        }
661
662        // First, check if the table was created in the current transaction
663        if let Some(specs) = self
664            .engine
665            .session()
666            .table_column_specs_from_transaction(canonical_name)
667        {
668            return Ok(specs
669                .into_iter()
670                .map(|spec| spec.name.to_ascii_lowercase())
671                .collect());
672        }
673
674        // Otherwise, look it up in the committed catalog
675        let (_, canonical_name) = llkv_table::canonical_table_name(display_name)
676            .map_err(|e| arrow::error::ArrowError::ExternalError(Box::new(e)))?;
677        match context.catalog().table_column_specs(&canonical_name) {
678            Ok(specs) => Ok(specs
679                .into_iter()
680                .map(|spec| spec.name.to_ascii_lowercase())
681                .collect()),
682            Err(err) => {
683                if !Self::is_table_missing_error(&err) {
684                    return Err(Self::map_table_error(display_name, err));
685                }
686
687                Ok(HashSet::new())
688            }
689        }
690    }
691
692    fn is_table_marked_dropped(&self, table_name: &str) -> SqlResult<bool> {
693        let canonical = table_name.to_ascii_lowercase();
694        Ok(self.engine.context().is_table_marked_dropped(&canonical))
695    }
696
697    fn handle_create_table(
698        &self,
699        mut stmt: sqlparser::ast::CreateTable,
700    ) -> SqlResult<RuntimeStatementResult<P>> {
701        validate_create_table_common(&stmt)?;
702
703        let (mut schema_name, table_name) = parse_schema_qualified_name(&stmt.name)?;
704
705        let namespace = if stmt.temporary {
706            if schema_name.is_some() {
707                return Err(Error::InvalidArgumentError(
708                    "temporary tables cannot specify an explicit schema".into(),
709                ));
710            }
711            schema_name = None;
712            Some(TEMPORARY_NAMESPACE_ID.to_string())
713        } else {
714            None
715        };
716
717        // Validate schema exists if specified
718        if let Some(ref schema) = schema_name {
719            let catalog = self.engine.context().table_catalog();
720            if !catalog.schema_exists(schema) {
721                return Err(Error::CatalogError(format!(
722                    "Schema '{}' does not exist",
723                    schema
724                )));
725            }
726        }
727
728        // Use full qualified name (schema.table or just table)
729        let display_name = match &schema_name {
730            Some(schema) => format!("{}.{}", schema, table_name),
731            None => table_name.clone(),
732        };
733        let canonical_name = display_name.to_ascii_lowercase();
734        tracing::trace!(
735            "\n=== HANDLE_CREATE_TABLE: table='{}' columns={} ===",
736            display_name,
737            stmt.columns.len()
738        );
739        if display_name.is_empty() {
740            return Err(Error::InvalidArgumentError(
741                "table name must not be empty".into(),
742            ));
743        }
744
745        if let Some(query) = stmt.query.take() {
746            validate_create_table_as(&stmt)?;
747            if let Some(result) = self.try_handle_range_ctas(
748                &display_name,
749                &canonical_name,
750                &query,
751                stmt.if_not_exists,
752                stmt.or_replace,
753                namespace.clone(),
754            )? {
755                return Ok(result);
756            }
757            return self.handle_create_table_as(
758                display_name,
759                canonical_name,
760                *query,
761                stmt.if_not_exists,
762                stmt.or_replace,
763                namespace.clone(),
764            );
765        }
766
767        if stmt.columns.is_empty() {
768            return Err(Error::InvalidArgumentError(
769                "CREATE TABLE requires at least one column".into(),
770            ));
771        }
772
773        validate_create_table_definition(&stmt)?;
774
775        let column_defs_ast = std::mem::take(&mut stmt.columns);
776        let constraints = std::mem::take(&mut stmt.constraints);
777
778        let column_names: Vec<String> = column_defs_ast
779            .iter()
780            .map(|column_def| column_def.name.value.clone())
781            .collect();
782        ensure_unique_case_insensitive(column_names.iter().map(|name| name.as_str()), |dup| {
783            format!(
784                "duplicate column name '{}' in table '{}'",
785                dup, display_name
786            )
787        })?;
788        let column_names_lower: HashSet<String> = column_names
789            .iter()
790            .map(|name| name.to_ascii_lowercase())
791            .collect();
792
793        let mut columns: Vec<PlanColumnSpec> = Vec::with_capacity(column_defs_ast.len());
794        let mut primary_key_columns: HashSet<String> = HashSet::new();
795        let mut foreign_keys: Vec<ForeignKeySpec> = Vec::new();
796        let mut multi_column_uniques: Vec<MultiColumnUniqueSpec> = Vec::new();
797
798        // Second pass: process columns including CHECK validation and column-level FKs
799        for column_def in column_defs_ast {
800            let is_nullable = column_def
801                .options
802                .iter()
803                .all(|opt| !matches!(opt.option, ColumnOption::NotNull));
804
805            let is_primary_key = column_def.options.iter().any(|opt| {
806                matches!(
807                    opt.option,
808                    ColumnOption::Unique {
809                        is_primary: true,
810                        characteristics: _
811                    }
812                )
813            });
814
815            let has_unique_constraint = column_def
816                .options
817                .iter()
818                .any(|opt| matches!(opt.option, ColumnOption::Unique { .. }));
819
820            // Extract CHECK constraint if present and validate it
821            let check_expr = column_def.options.iter().find_map(|opt| {
822                if let ColumnOption::Check(expr) = &opt.option {
823                    Some(expr)
824                } else {
825                    None
826                }
827            });
828
829            // Validate CHECK constraint if present (now we have all column names)
830            if let Some(check_expr) = check_expr {
831                let all_col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
832                validate_check_constraint(check_expr, &display_name, &all_col_refs)?;
833            }
834
835            let check_expr_str = check_expr.map(|e| e.to_string());
836
837            // Extract column-level FOREIGN KEY (REFERENCES clause)
838            for opt in &column_def.options {
839                if let ColumnOption::ForeignKey {
840                    foreign_table,
841                    referred_columns,
842                    on_delete,
843                    on_update,
844                    characteristics,
845                } = &opt.option
846                {
847                    let spec = self.build_foreign_key_spec(
848                        &display_name,
849                        &canonical_name,
850                        vec![column_def.name.value.clone()],
851                        foreign_table,
852                        referred_columns,
853                        *on_delete,
854                        *on_update,
855                        characteristics,
856                        &column_names_lower,
857                        None,
858                    )?;
859                    foreign_keys.push(spec);
860                }
861            }
862
863            tracing::trace!(
864                "DEBUG CREATE TABLE column '{}' is_primary_key={} has_unique={} check_expr={:?}",
865                column_def.name.value,
866                is_primary_key,
867                has_unique_constraint,
868                check_expr_str
869            );
870
871            // Resolve custom type aliases to their base types
872            let resolved_data_type = self.engine.context().resolve_type(&column_def.data_type);
873
874            let mut column = PlanColumnSpec::new(
875                column_def.name.value.clone(),
876                arrow_type_from_sql(&resolved_data_type)?,
877                is_nullable,
878            );
879            tracing::trace!(
880                "DEBUG PlanColumnSpec after new(): primary_key={} unique={}",
881                column.primary_key,
882                column.unique
883            );
884
885            column = column
886                .with_primary_key(is_primary_key)
887                .with_unique(has_unique_constraint)
888                .with_check(check_expr_str);
889
890            if is_primary_key {
891                column.nullable = false;
892                primary_key_columns.insert(column.name.to_ascii_lowercase());
893            }
894            tracing::trace!(
895                "DEBUG PlanColumnSpec after with_primary_key({})/with_unique({}): primary_key={} unique={} check_expr={:?}",
896                is_primary_key,
897                has_unique_constraint,
898                column.primary_key,
899                column.unique,
900                column.check_expr
901            );
902
903            columns.push(column);
904        }
905
906        // Apply supported table-level constraints (e.g., PRIMARY KEY)
907        if !constraints.is_empty() {
908            let mut column_lookup: HashMap<String, usize> = HashMap::with_capacity(columns.len());
909            for (idx, column) in columns.iter().enumerate() {
910                column_lookup.insert(column.name.to_ascii_lowercase(), idx);
911            }
912
913            for constraint in constraints {
914                match constraint {
915                    TableConstraint::PrimaryKey {
916                        columns: constraint_columns,
917                        ..
918                    } => {
919                        if !primary_key_columns.is_empty() {
920                            return Err(Error::InvalidArgumentError(
921                                "multiple PRIMARY KEY constraints are not supported".into(),
922                            ));
923                        }
924
925                        ensure_non_empty(&constraint_columns, || {
926                            "PRIMARY KEY requires at least one column".into()
927                        })?;
928
929                        let mut pk_column_names: Vec<String> =
930                            Vec::with_capacity(constraint_columns.len());
931
932                        for index_col in &constraint_columns {
933                            let column_ident = extract_index_column_name(
934                                index_col,
935                                "PRIMARY KEY",
936                                false, // no sort options allowed
937                                false, // only simple identifiers
938                            )?;
939                            pk_column_names.push(column_ident);
940                        }
941
942                        ensure_unique_case_insensitive(
943                            pk_column_names.iter().map(|name| name.as_str()),
944                            |dup| format!("duplicate column '{}' in PRIMARY KEY constraint", dup),
945                        )?;
946
947                        ensure_known_columns_case_insensitive(
948                            pk_column_names.iter().map(|name| name.as_str()),
949                            &column_names_lower,
950                            |unknown| {
951                                format!("unknown column '{}' in PRIMARY KEY constraint", unknown)
952                            },
953                        )?;
954
955                        for column_ident in pk_column_names {
956                            let normalized = column_ident.to_ascii_lowercase();
957                            let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
958                                Error::InvalidArgumentError(format!(
959                                    "unknown column '{}' in PRIMARY KEY constraint",
960                                    column_ident
961                                ))
962                            })?;
963
964                            let column = columns.get_mut(idx).expect("column index valid");
965                            column.primary_key = true;
966                            column.unique = true;
967                            column.nullable = false;
968
969                            primary_key_columns.insert(normalized);
970                        }
971                    }
972                    TableConstraint::Unique {
973                        columns: constraint_columns,
974                        index_type,
975                        index_options,
976                        characteristics,
977                        nulls_distinct,
978                        name,
979                        ..
980                    } => {
981                        if !matches!(nulls_distinct, NullsDistinctOption::None) {
982                            return Err(Error::InvalidArgumentError(
983                                "UNIQUE constraints with NULLS DISTINCT/NOT DISTINCT are not supported yet".into(),
984                            ));
985                        }
986
987                        if index_type.is_some() {
988                            return Err(Error::InvalidArgumentError(
989                                "UNIQUE constraints with index types are not supported yet".into(),
990                            ));
991                        }
992
993                        if !index_options.is_empty() {
994                            return Err(Error::InvalidArgumentError(
995                                "UNIQUE constraints with index options are not supported yet"
996                                    .into(),
997                            ));
998                        }
999
1000                        if characteristics.is_some() {
1001                            return Err(Error::InvalidArgumentError(
1002                                "UNIQUE constraint characteristics are not supported yet".into(),
1003                            ));
1004                        }
1005
1006                        ensure_non_empty(&constraint_columns, || {
1007                            "UNIQUE constraint requires at least one column".into()
1008                        })?;
1009
1010                        let mut unique_column_names: Vec<String> =
1011                            Vec::with_capacity(constraint_columns.len());
1012
1013                        for index_column in &constraint_columns {
1014                            let column_ident = extract_index_column_name(
1015                                index_column,
1016                                "UNIQUE constraint",
1017                                false, // no sort options allowed
1018                                false, // only simple identifiers
1019                            )?;
1020                            unique_column_names.push(column_ident);
1021                        }
1022
1023                        ensure_unique_case_insensitive(
1024                            unique_column_names.iter().map(|name| name.as_str()),
1025                            |dup| format!("duplicate column '{}' in UNIQUE constraint", dup),
1026                        )?;
1027
1028                        ensure_known_columns_case_insensitive(
1029                            unique_column_names.iter().map(|name| name.as_str()),
1030                            &column_names_lower,
1031                            |unknown| format!("unknown column '{}' in UNIQUE constraint", unknown),
1032                        )?;
1033
1034                        if unique_column_names.len() > 1 {
1035                            // Multi-column UNIQUE constraint
1036                            multi_column_uniques.push(MultiColumnUniqueSpec {
1037                                name: name.map(|n| n.value),
1038                                columns: unique_column_names,
1039                            });
1040                        } else {
1041                            // Single-column UNIQUE constraint
1042                            let column_ident = unique_column_names
1043                                .into_iter()
1044                                .next()
1045                                .expect("unique constraint checked for emptiness");
1046                            let normalized = column_ident.to_ascii_lowercase();
1047                            let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
1048                                Error::InvalidArgumentError(format!(
1049                                    "unknown column '{}' in UNIQUE constraint",
1050                                    column_ident
1051                                ))
1052                            })?;
1053
1054                            let column = columns
1055                                .get_mut(idx)
1056                                .expect("column index from lookup must be valid");
1057                            column.unique = true;
1058                        }
1059                    }
1060                    TableConstraint::ForeignKey {
1061                        name,
1062                        index_name,
1063                        columns: fk_columns,
1064                        foreign_table,
1065                        referred_columns,
1066                        on_delete,
1067                        on_update,
1068                        characteristics,
1069                        ..
1070                    } => {
1071                        if index_name.is_some() {
1072                            return Err(Error::InvalidArgumentError(
1073                                "FOREIGN KEY index clauses are not supported yet".into(),
1074                            ));
1075                        }
1076
1077                        let referencing_columns: Vec<String> =
1078                            fk_columns.into_iter().map(|ident| ident.value).collect();
1079                        let spec = self.build_foreign_key_spec(
1080                            &display_name,
1081                            &canonical_name,
1082                            referencing_columns,
1083                            &foreign_table,
1084                            &referred_columns,
1085                            on_delete,
1086                            on_update,
1087                            &characteristics,
1088                            &column_names_lower,
1089                            name.map(|ident| ident.value),
1090                        )?;
1091
1092                        foreign_keys.push(spec);
1093                    }
1094                    unsupported => {
1095                        return Err(Error::InvalidArgumentError(format!(
1096                            "table-level constraint {:?} is not supported",
1097                            unsupported
1098                        )));
1099                    }
1100                }
1101            }
1102        }
1103
1104        let plan = CreateTablePlan {
1105            name: display_name,
1106            if_not_exists: stmt.if_not_exists,
1107            or_replace: stmt.or_replace,
1108            columns,
1109            source: None,
1110            namespace,
1111            foreign_keys,
1112            multi_column_uniques,
1113        };
1114        self.execute_plan_statement(PlanStatement::CreateTable(plan))
1115    }
1116
1117    fn handle_create_index(
1118        &self,
1119        stmt: sqlparser::ast::CreateIndex,
1120    ) -> SqlResult<RuntimeStatementResult<P>> {
1121        let sqlparser::ast::CreateIndex {
1122            name,
1123            table_name,
1124            using,
1125            columns,
1126            unique,
1127            concurrently,
1128            if_not_exists,
1129            include,
1130            nulls_distinct,
1131            with,
1132            predicate,
1133            index_options,
1134            alter_options,
1135            ..
1136        } = stmt;
1137
1138        if concurrently {
1139            return Err(Error::InvalidArgumentError(
1140                "CREATE INDEX CONCURRENTLY is not supported".into(),
1141            ));
1142        }
1143        if using.is_some() {
1144            return Err(Error::InvalidArgumentError(
1145                "CREATE INDEX USING clauses are not supported".into(),
1146            ));
1147        }
1148        if !include.is_empty() {
1149            return Err(Error::InvalidArgumentError(
1150                "CREATE INDEX INCLUDE columns are not supported".into(),
1151            ));
1152        }
1153        if nulls_distinct.is_some() {
1154            return Err(Error::InvalidArgumentError(
1155                "CREATE INDEX NULLS DISTINCT is not supported".into(),
1156            ));
1157        }
1158        if !with.is_empty() {
1159            return Err(Error::InvalidArgumentError(
1160                "CREATE INDEX WITH options are not supported".into(),
1161            ));
1162        }
1163        if predicate.is_some() {
1164            return Err(Error::InvalidArgumentError(
1165                "partial CREATE INDEX is not supported".into(),
1166            ));
1167        }
1168        if !index_options.is_empty() {
1169            return Err(Error::InvalidArgumentError(
1170                "CREATE INDEX options are not supported".into(),
1171            ));
1172        }
1173        if !alter_options.is_empty() {
1174            return Err(Error::InvalidArgumentError(
1175                "CREATE INDEX ALTER options are not supported".into(),
1176            ));
1177        }
1178        if columns.is_empty() {
1179            return Err(Error::InvalidArgumentError(
1180                "CREATE INDEX requires at least one column".into(),
1181            ));
1182        }
1183
1184        let (schema_name, base_table_name) = parse_schema_qualified_name(&table_name)?;
1185        if let Some(ref schema) = schema_name {
1186            let catalog = self.engine.context().table_catalog();
1187            if !catalog.schema_exists(schema) {
1188                return Err(Error::CatalogError(format!(
1189                    "Schema '{}' does not exist",
1190                    schema
1191                )));
1192            }
1193        }
1194
1195        let display_table_name = schema_name
1196            .as_ref()
1197            .map(|schema| format!("{}.{}", schema, base_table_name))
1198            .unwrap_or_else(|| base_table_name.clone());
1199        let canonical_table_name = display_table_name.to_ascii_lowercase();
1200
1201        let known_columns =
1202            self.collect_known_columns(&display_table_name, &canonical_table_name)?;
1203        let enforce_known_columns = !known_columns.is_empty();
1204
1205        let index_name = match name {
1206            Some(name_obj) => Some(Self::object_name_to_string(&name_obj)?),
1207            None => None,
1208        };
1209
1210        let mut index_columns: Vec<IndexColumnPlan> = Vec::with_capacity(columns.len());
1211        let mut seen_column_names: HashSet<String> = HashSet::new();
1212        for item in columns {
1213            // Check WITH FILL before calling helper (not part of standard validation)
1214            if item.column.with_fill.is_some() {
1215                return Err(Error::InvalidArgumentError(
1216                    "CREATE INDEX column WITH FILL is not supported".into(),
1217                ));
1218            }
1219
1220            let column_name = extract_index_column_name(
1221                &item,
1222                "CREATE INDEX",
1223                true, // allow and validate sort options
1224                true, // allow compound identifiers
1225            )?;
1226
1227            // Get sort options (already validated by helper)
1228            let order_expr = &item.column;
1229            let ascending = order_expr.options.asc.unwrap_or(true);
1230            let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
1231
1232            let normalized = column_name.to_ascii_lowercase();
1233            if !seen_column_names.insert(normalized.clone()) {
1234                return Err(Error::InvalidArgumentError(format!(
1235                    "duplicate column '{}' in CREATE INDEX",
1236                    column_name
1237                )));
1238            }
1239
1240            if enforce_known_columns && !known_columns.contains(&normalized) {
1241                return Err(Error::InvalidArgumentError(format!(
1242                    "column '{}' does not exist in table '{}'",
1243                    column_name, display_table_name
1244                )));
1245            }
1246
1247            let column_plan = IndexColumnPlan::new(column_name).with_sort(ascending, nulls_first);
1248            index_columns.push(column_plan);
1249        }
1250
1251        let plan = CreateIndexPlan::new(display_table_name)
1252            .with_name(index_name)
1253            .with_unique(unique)
1254            .with_if_not_exists(if_not_exists)
1255            .with_columns(index_columns);
1256
1257        self.execute_plan_statement(PlanStatement::CreateIndex(plan))
1258    }
1259
1260    fn map_referential_action(
1261        action: Option<ReferentialAction>,
1262        kind: &str,
1263    ) -> SqlResult<ForeignKeyAction> {
1264        match action {
1265            None | Some(ReferentialAction::NoAction) => Ok(ForeignKeyAction::NoAction),
1266            Some(ReferentialAction::Restrict) => Ok(ForeignKeyAction::Restrict),
1267            Some(other) => Err(Error::InvalidArgumentError(format!(
1268                "FOREIGN KEY ON {kind} {:?} is not supported yet",
1269                other
1270            ))),
1271        }
1272    }
1273
1274    #[allow(clippy::too_many_arguments)]
1275    fn build_foreign_key_spec(
1276        &self,
1277        _referencing_display: &str,
1278        referencing_canonical: &str,
1279        referencing_columns: Vec<String>,
1280        foreign_table: &ObjectName,
1281        referenced_columns: &[Ident],
1282        on_delete: Option<ReferentialAction>,
1283        on_update: Option<ReferentialAction>,
1284        characteristics: &Option<ConstraintCharacteristics>,
1285        known_columns_lower: &HashSet<String>,
1286        name: Option<String>,
1287    ) -> SqlResult<ForeignKeySpec> {
1288        if characteristics.is_some() {
1289            return Err(Error::InvalidArgumentError(
1290                "FOREIGN KEY constraint characteristics are not supported yet".into(),
1291            ));
1292        }
1293
1294        ensure_non_empty(&referencing_columns, || {
1295            "FOREIGN KEY constraint requires at least one referencing column".into()
1296        })?;
1297        ensure_unique_case_insensitive(
1298            referencing_columns.iter().map(|name| name.as_str()),
1299            |dup| format!("duplicate column '{}' in FOREIGN KEY constraint", dup),
1300        )?;
1301        ensure_known_columns_case_insensitive(
1302            referencing_columns.iter().map(|name| name.as_str()),
1303            known_columns_lower,
1304            |unknown| format!("unknown column '{}' in FOREIGN KEY constraint", unknown),
1305        )?;
1306
1307        let referenced_columns_vec: Vec<String> = referenced_columns
1308            .iter()
1309            .map(|ident| ident.value.clone())
1310            .collect();
1311        ensure_unique_case_insensitive(
1312            referenced_columns_vec.iter().map(|name| name.as_str()),
1313            |dup| {
1314                format!(
1315                    "duplicate referenced column '{}' in FOREIGN KEY constraint",
1316                    dup
1317                )
1318            },
1319        )?;
1320
1321        if !referenced_columns_vec.is_empty()
1322            && referenced_columns_vec.len() != referencing_columns.len()
1323        {
1324            return Err(Error::InvalidArgumentError(
1325                "FOREIGN KEY referencing and referenced column counts must match".into(),
1326            ));
1327        }
1328
1329        let (referenced_display, referenced_canonical) = canonical_object_name(foreign_table)?;
1330
1331        // Check if the referenced table is a VIEW - VIEWs cannot be referenced by foreign keys
1332        let catalog = self.engine.context().table_catalog();
1333        if let Some(table_id) = catalog.table_id(&referenced_canonical) {
1334            let context = self.engine.context();
1335            if context.is_view(table_id)? {
1336                return Err(Error::CatalogError(format!(
1337                    "Binder Error: cannot reference a VIEW with a FOREIGN KEY: {}",
1338                    referenced_display
1339                )));
1340            }
1341        }
1342
1343        if referenced_canonical == referencing_canonical {
1344            ensure_known_columns_case_insensitive(
1345                referenced_columns_vec.iter().map(|name| name.as_str()),
1346                known_columns_lower,
1347                |unknown| {
1348                    format!(
1349                        "Binder Error: table '{}' does not have a column named '{}'",
1350                        referenced_display, unknown
1351                    )
1352                },
1353            )?;
1354        } else {
1355            let known_columns =
1356                self.collect_known_columns(&referenced_display, &referenced_canonical)?;
1357            if !known_columns.is_empty() {
1358                ensure_known_columns_case_insensitive(
1359                    referenced_columns_vec.iter().map(|name| name.as_str()),
1360                    &known_columns,
1361                    |unknown| {
1362                        format!(
1363                            "Binder Error: table '{}' does not have a column named '{}'",
1364                            referenced_display, unknown
1365                        )
1366                    },
1367                )?;
1368            }
1369        }
1370
1371        let on_delete_action = Self::map_referential_action(on_delete, "DELETE")?;
1372        let on_update_action = Self::map_referential_action(on_update, "UPDATE")?;
1373
1374        Ok(ForeignKeySpec {
1375            name,
1376            columns: referencing_columns,
1377            referenced_table: referenced_display,
1378            referenced_columns: referenced_columns_vec,
1379            on_delete: on_delete_action,
1380            on_update: on_update_action,
1381        })
1382    }
1383
1384    fn handle_create_schema(
1385        &self,
1386        schema_name: SchemaName,
1387        _if_not_exists: bool,
1388        with: Option<Vec<SqlOption>>,
1389        options: Option<Vec<SqlOption>>,
1390        default_collate_spec: Option<SqlExpr>,
1391        clone: Option<ObjectName>,
1392    ) -> SqlResult<RuntimeStatementResult<P>> {
1393        if clone.is_some() {
1394            return Err(Error::InvalidArgumentError(
1395                "CREATE SCHEMA ... CLONE is not supported".into(),
1396            ));
1397        }
1398        if with.as_ref().is_some_and(|opts| !opts.is_empty()) {
1399            return Err(Error::InvalidArgumentError(
1400                "CREATE SCHEMA ... WITH options are not supported".into(),
1401            ));
1402        }
1403        if options.as_ref().is_some_and(|opts| !opts.is_empty()) {
1404            return Err(Error::InvalidArgumentError(
1405                "CREATE SCHEMA options are not supported".into(),
1406            ));
1407        }
1408        if default_collate_spec.is_some() {
1409            return Err(Error::InvalidArgumentError(
1410                "CREATE SCHEMA DEFAULT COLLATE is not supported".into(),
1411            ));
1412        }
1413
1414        let schema_name = match schema_name {
1415            SchemaName::Simple(name) => name,
1416            _ => {
1417                return Err(Error::InvalidArgumentError(
1418                    "CREATE SCHEMA authorization is not supported".into(),
1419                ));
1420            }
1421        };
1422
1423        let (display_name, canonical) = canonical_object_name(&schema_name)?;
1424        if display_name.is_empty() {
1425            return Err(Error::InvalidArgumentError(
1426                "schema name must not be empty".into(),
1427            ));
1428        }
1429
1430        // Register schema in the catalog
1431        let catalog = self.engine.context().table_catalog();
1432
1433        if _if_not_exists && catalog.schema_exists(&canonical) {
1434            return Ok(RuntimeStatementResult::NoOp);
1435        }
1436
1437        catalog.register_schema(&canonical).map_err(|err| {
1438            Error::CatalogError(format!(
1439                "Failed to create schema '{}': {}",
1440                display_name, err
1441            ))
1442        })?;
1443
1444        Ok(RuntimeStatementResult::NoOp)
1445    }
1446
1447    #[allow(clippy::too_many_arguments)]
1448    fn handle_create_view(
1449        &self,
1450        name: ObjectName,
1451        _columns: Vec<sqlparser::ast::ViewColumnDef>,
1452        query: Box<sqlparser::ast::Query>,
1453        materialized: bool,
1454        or_replace: bool,
1455        or_alter: bool,
1456        _options: sqlparser::ast::CreateTableOptions,
1457        _cluster_by: Vec<sqlparser::ast::Ident>,
1458        _comment: Option<String>,
1459        _with_no_schema_binding: bool,
1460        if_not_exists: bool,
1461        temporary: bool,
1462        _to: Option<ObjectName>,
1463        _params: Option<sqlparser::ast::CreateViewParams>,
1464        _secure: bool,
1465        _name_before_not_exists: bool,
1466    ) -> SqlResult<RuntimeStatementResult<P>> {
1467        // Validate unsupported features
1468        if materialized {
1469            return Err(Error::InvalidArgumentError(
1470                "MATERIALIZED VIEWS are not supported".into(),
1471            ));
1472        }
1473        if or_replace {
1474            return Err(Error::InvalidArgumentError(
1475                "CREATE OR REPLACE VIEW is not supported".into(),
1476            ));
1477        }
1478        if or_alter {
1479            return Err(Error::InvalidArgumentError(
1480                "CREATE OR ALTER VIEW is not supported".into(),
1481            ));
1482        }
1483        if temporary {
1484            return Err(Error::InvalidArgumentError(
1485                "TEMPORARY VIEWS are not supported".into(),
1486            ));
1487        }
1488
1489        // Parse view name (same as table parsing)
1490        let (schema_name, view_name) = parse_schema_qualified_name(&name)?;
1491
1492        // Validate schema exists if specified
1493        if let Some(ref schema) = schema_name {
1494            let catalog = self.engine.context().table_catalog();
1495            if !catalog.schema_exists(schema) {
1496                return Err(Error::CatalogError(format!(
1497                    "Schema '{}' does not exist",
1498                    schema
1499                )));
1500            }
1501        }
1502
1503        // Use full qualified name (schema.view or just view)
1504        let display_name = match &schema_name {
1505            Some(schema) => format!("{}.{}", schema, view_name),
1506            None => view_name.clone(),
1507        };
1508        let canonical_name = display_name.to_ascii_lowercase();
1509
1510        // Check if view already exists
1511        let catalog = self.engine.context().table_catalog();
1512        if catalog.table_exists(&canonical_name) {
1513            if if_not_exists {
1514                return Ok(RuntimeStatementResult::NoOp);
1515            }
1516            return Err(Error::CatalogError(format!(
1517                "Table or view '{}' already exists",
1518                display_name
1519            )));
1520        }
1521
1522        // Convert query to SQL string for storage
1523        let view_definition = query.to_string();
1524
1525        // Create the view using the runtime context helper
1526        let context = self.engine.context();
1527        context.create_view(&display_name, view_definition)?;
1528
1529        tracing::debug!("Created view: {}", display_name);
1530        Ok(RuntimeStatementResult::NoOp)
1531    }
1532
1533    fn handle_create_domain(
1534        &self,
1535        create_domain: sqlparser::ast::CreateDomain,
1536    ) -> SqlResult<RuntimeStatementResult<P>> {
1537        use llkv_table::CustomTypeMeta;
1538        use std::time::{SystemTime, UNIX_EPOCH};
1539
1540        // Extract the type alias name
1541        let type_name = create_domain.name.to_string();
1542
1543        // Convert the data type to SQL string for persistence
1544        let base_type_sql = create_domain.data_type.to_string();
1545
1546        // Register the type alias in the runtime context (in-memory)
1547        self.engine
1548            .context()
1549            .register_type(type_name.clone(), create_domain.data_type.clone());
1550
1551        // Persist to catalog
1552        let context = self.engine.context();
1553        let catalog = llkv_table::SysCatalog::new(context.store());
1554
1555        let created_at_micros = SystemTime::now()
1556            .duration_since(UNIX_EPOCH)
1557            .unwrap_or_default()
1558            .as_micros() as u64;
1559
1560        let meta = CustomTypeMeta {
1561            name: type_name.clone(),
1562            base_type_sql,
1563            created_at_micros,
1564        };
1565
1566        catalog.put_custom_type_meta(&meta)?;
1567
1568        tracing::debug!("Created and persisted type alias: {}", type_name);
1569        Ok(RuntimeStatementResult::NoOp)
1570    }
1571
1572    fn handle_drop_domain(
1573        &self,
1574        drop_domain: sqlparser::ast::DropDomain,
1575    ) -> SqlResult<RuntimeStatementResult<P>> {
1576        let if_exists = drop_domain.if_exists;
1577        let type_name = drop_domain.name.to_string();
1578
1579        // Drop the type from the registry (in-memory)
1580        let result = self.engine.context().drop_type(&type_name);
1581
1582        if let Err(err) = result {
1583            if !if_exists {
1584                return Err(err);
1585            }
1586            // if_exists = true, so ignore the error
1587        } else {
1588            // Persist deletion to catalog
1589            let context = self.engine.context();
1590            let catalog = llkv_table::SysCatalog::new(context.store());
1591            catalog.delete_custom_type_meta(&type_name)?;
1592
1593            tracing::debug!("Dropped and removed from catalog type alias: {}", type_name);
1594        }
1595
1596        Ok(RuntimeStatementResult::NoOp)
1597    }
1598
1599    fn try_handle_range_ctas(
1600        &self,
1601        display_name: &str,
1602        _canonical_name: &str,
1603        query: &Query,
1604        if_not_exists: bool,
1605        or_replace: bool,
1606        namespace: Option<String>,
1607    ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1608        let select = match query.body.as_ref() {
1609            SetExpr::Select(select) => select,
1610            _ => return Ok(None),
1611        };
1612        if select.from.len() != 1 {
1613            return Ok(None);
1614        }
1615        let table_with_joins = &select.from[0];
1616        if !table_with_joins.joins.is_empty() {
1617            return Ok(None);
1618        }
1619        let (range_size, range_alias) = match &table_with_joins.relation {
1620            TableFactor::Table {
1621                name,
1622                args: Some(args),
1623                alias,
1624                ..
1625            } => {
1626                let func_name = name.to_string().to_ascii_lowercase();
1627                if func_name != "range" {
1628                    return Ok(None);
1629                }
1630                if args.args.len() != 1 {
1631                    return Err(Error::InvalidArgumentError(
1632                        "range table function expects a single argument".into(),
1633                    ));
1634                }
1635                let size_expr = &args.args[0];
1636                let range_size = match size_expr {
1637                    FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1638                        match &value.value {
1639                            Value::Number(raw, _) => raw.parse::<i64>().map_err(|e| {
1640                                Error::InvalidArgumentError(format!(
1641                                    "invalid range size literal {}: {}",
1642                                    raw, e
1643                                ))
1644                            })?,
1645                            other => {
1646                                return Err(Error::InvalidArgumentError(format!(
1647                                    "unsupported range size value: {:?}",
1648                                    other
1649                                )));
1650                            }
1651                        }
1652                    }
1653                    _ => {
1654                        return Err(Error::InvalidArgumentError(
1655                            "unsupported range argument".into(),
1656                        ));
1657                    }
1658                };
1659                (range_size, alias.as_ref().map(|a| a.name.value.clone()))
1660            }
1661            _ => return Ok(None),
1662        };
1663
1664        if range_size < 0 {
1665            return Err(Error::InvalidArgumentError(
1666                "range size must be non-negative".into(),
1667            ));
1668        }
1669
1670        if select.projection.is_empty() {
1671            return Err(Error::InvalidArgumentError(
1672                "CREATE TABLE AS SELECT requires at least one projected column".into(),
1673            ));
1674        }
1675
1676        let mut column_specs = Vec::with_capacity(select.projection.len());
1677        let mut column_names = Vec::with_capacity(select.projection.len());
1678        let mut row_template = Vec::with_capacity(select.projection.len());
1679        for item in &select.projection {
1680            match item {
1681                SelectItem::ExprWithAlias { expr, alias } => {
1682                    let (value, data_type) = match expr {
1683                        SqlExpr::Value(value_with_span) => match &value_with_span.value {
1684                            Value::Number(raw, _) => {
1685                                let parsed = raw.parse::<i64>().map_err(|e| {
1686                                    Error::InvalidArgumentError(format!(
1687                                        "invalid numeric literal {}: {}",
1688                                        raw, e
1689                                    ))
1690                                })?;
1691                                (
1692                                    PlanValue::Integer(parsed),
1693                                    arrow::datatypes::DataType::Int64,
1694                                )
1695                            }
1696                            Value::SingleQuotedString(s) => (
1697                                PlanValue::String(s.clone()),
1698                                arrow::datatypes::DataType::Utf8,
1699                            ),
1700                            other => {
1701                                return Err(Error::InvalidArgumentError(format!(
1702                                    "unsupported SELECT expression in range CTAS: {:?}",
1703                                    other
1704                                )));
1705                            }
1706                        },
1707                        SqlExpr::Identifier(ident) => {
1708                            let ident_lower = ident.value.to_ascii_lowercase();
1709                            if range_alias
1710                                .as_ref()
1711                                .map(|a| a.eq_ignore_ascii_case(&ident_lower))
1712                                .unwrap_or(false)
1713                                || ident_lower == "range"
1714                            {
1715                                return Err(Error::InvalidArgumentError(
1716                                    "range() table function columns are not supported yet".into(),
1717                                ));
1718                            }
1719                            return Err(Error::InvalidArgumentError(format!(
1720                                "unsupported identifier '{}' in range CTAS projection",
1721                                ident.value
1722                            )));
1723                        }
1724                        other => {
1725                            return Err(Error::InvalidArgumentError(format!(
1726                                "unsupported SELECT expression in range CTAS: {:?}",
1727                                other
1728                            )));
1729                        }
1730                    };
1731                    let column_name = alias.value.clone();
1732                    column_specs.push(PlanColumnSpec::new(column_name.clone(), data_type, true));
1733                    column_names.push(column_name);
1734                    row_template.push(value);
1735                }
1736                other => {
1737                    return Err(Error::InvalidArgumentError(format!(
1738                        "unsupported projection {:?} in range CTAS",
1739                        other
1740                    )));
1741                }
1742            }
1743        }
1744
1745        let plan = CreateTablePlan {
1746            name: display_name.to_string(),
1747            if_not_exists,
1748            or_replace,
1749            columns: column_specs,
1750            source: None,
1751            namespace,
1752            foreign_keys: Vec::new(),
1753            multi_column_uniques: Vec::new(),
1754        };
1755        let create_result = self.execute_plan_statement(PlanStatement::CreateTable(plan))?;
1756
1757        let row_count = range_size
1758            .try_into()
1759            .map_err(|_| Error::InvalidArgumentError("range size exceeds usize".into()))?;
1760        if row_count > 0 {
1761            let rows = vec![row_template; row_count];
1762            let insert_plan = InsertPlan {
1763                table: display_name.to_string(),
1764                columns: column_names,
1765                source: InsertSource::Rows(rows),
1766            };
1767            self.execute_plan_statement(PlanStatement::Insert(insert_plan))?;
1768        }
1769
1770        Ok(Some(create_result))
1771    }
1772
1773    // TODO: Refactor into runtime or executor layer?
1774    // NOTE: PRAGMA handling lives in the SQL layer until a shared runtime hook exists.
1775    /// Try to handle pragma_table_info('table_name') table function
1776    fn try_handle_pragma_table_info(
1777        &self,
1778        query: &Query,
1779    ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1780        let select = match query.body.as_ref() {
1781            SetExpr::Select(select) => select,
1782            _ => return Ok(None),
1783        };
1784
1785        if select.from.len() != 1 {
1786            return Ok(None);
1787        }
1788
1789        let table_with_joins = &select.from[0];
1790        if !table_with_joins.joins.is_empty() {
1791            return Ok(None);
1792        }
1793
1794        // Check if this is pragma_table_info function call
1795        let table_name = match &table_with_joins.relation {
1796            TableFactor::Table {
1797                name,
1798                args: Some(args),
1799                ..
1800            } => {
1801                let func_name = name.to_string().to_ascii_lowercase();
1802                if func_name != "pragma_table_info" {
1803                    return Ok(None);
1804                }
1805
1806                // Extract table name from argument
1807                if args.args.len() != 1 {
1808                    return Err(Error::InvalidArgumentError(
1809                        "pragma_table_info expects exactly one argument".into(),
1810                    ));
1811                }
1812
1813                match &args.args[0] {
1814                    FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1815                        match &value.value {
1816                            Value::SingleQuotedString(s) => s.clone(),
1817                            Value::DoubleQuotedString(s) => s.clone(),
1818                            _ => {
1819                                return Err(Error::InvalidArgumentError(
1820                                    "pragma_table_info argument must be a string".into(),
1821                                ));
1822                            }
1823                        }
1824                    }
1825                    _ => {
1826                        return Err(Error::InvalidArgumentError(
1827                            "pragma_table_info argument must be a string literal".into(),
1828                        ));
1829                    }
1830                }
1831            }
1832            _ => return Ok(None),
1833        };
1834
1835        // Get table column specs from runtime context
1836        let context = self.engine.context();
1837        let (_, canonical_name) = llkv_table::canonical_table_name(&table_name)?;
1838        let columns = context.catalog().table_column_specs(&canonical_name)?;
1839
1840        // Build RecordBatch with table column information
1841        use arrow::array::{BooleanArray, Int32Array, StringArray};
1842        use arrow::datatypes::{DataType, Field, Schema};
1843
1844        let mut cid_values = Vec::new();
1845        let mut name_values = Vec::new();
1846        let mut type_values = Vec::new();
1847        let mut notnull_values = Vec::new();
1848        let mut dflt_value_values: Vec<Option<String>> = Vec::new();
1849        let mut pk_values = Vec::new();
1850
1851        for (idx, col) in columns.iter().enumerate() {
1852            cid_values.push(idx as i32);
1853            name_values.push(col.name.clone());
1854            type_values.push(format!("{:?}", col.data_type)); // Simple type representation
1855            notnull_values.push(!col.nullable);
1856            dflt_value_values.push(None); // We don't track default values yet
1857            pk_values.push(col.primary_key);
1858        }
1859
1860        let schema = Arc::new(Schema::new(vec![
1861            Field::new("cid", DataType::Int32, false),
1862            Field::new("name", DataType::Utf8, false),
1863            Field::new("type", DataType::Utf8, false),
1864            Field::new("notnull", DataType::Boolean, false),
1865            Field::new("dflt_value", DataType::Utf8, true),
1866            Field::new("pk", DataType::Boolean, false),
1867        ]));
1868
1869        use arrow::array::ArrayRef;
1870        let mut batch = RecordBatch::try_new(
1871            Arc::clone(&schema),
1872            vec![
1873                Arc::new(Int32Array::from(cid_values)) as ArrayRef,
1874                Arc::new(StringArray::from(name_values)) as ArrayRef,
1875                Arc::new(StringArray::from(type_values)) as ArrayRef,
1876                Arc::new(BooleanArray::from(notnull_values)) as ArrayRef,
1877                Arc::new(StringArray::from(dflt_value_values)) as ArrayRef,
1878                Arc::new(BooleanArray::from(pk_values)) as ArrayRef,
1879            ],
1880        )
1881        .map_err(|e| Error::Internal(format!("failed to create pragma_table_info batch: {}", e)))?;
1882
1883        // Apply SELECT projections: extract only requested columns
1884        let projection_indices: Vec<usize> = select
1885            .projection
1886            .iter()
1887            .filter_map(|item| {
1888                match item {
1889                    SelectItem::UnnamedExpr(SqlExpr::Identifier(ident)) => {
1890                        schema.index_of(&ident.value).ok()
1891                    }
1892                    SelectItem::ExprWithAlias { expr, .. } => {
1893                        if let SqlExpr::Identifier(ident) = expr {
1894                            schema.index_of(&ident.value).ok()
1895                        } else {
1896                            None
1897                        }
1898                    }
1899                    SelectItem::Wildcard(_) => None, // Handle * separately
1900                    _ => None,
1901                }
1902            })
1903            .collect();
1904
1905        // Apply projections if not SELECT *
1906        let projected_schema;
1907        if !projection_indices.is_empty() {
1908            let projected_fields: Vec<Field> = projection_indices
1909                .iter()
1910                .map(|&idx| schema.field(idx).clone())
1911                .collect();
1912            projected_schema = Arc::new(Schema::new(projected_fields));
1913
1914            let projected_columns: Vec<ArrayRef> = projection_indices
1915                .iter()
1916                .map(|&idx| Arc::clone(batch.column(idx)))
1917                .collect();
1918
1919            batch = RecordBatch::try_new(Arc::clone(&projected_schema), projected_columns)
1920                .map_err(|e| Error::Internal(format!("failed to project columns: {}", e)))?;
1921        } else {
1922            // SELECT * or complex projections - use original schema
1923            projected_schema = schema;
1924        }
1925
1926        // Apply ORDER BY using Arrow compute kernels
1927        if let Some(order_by) = &query.order_by {
1928            use arrow::compute::SortColumn;
1929            use arrow::compute::lexsort_to_indices;
1930            use sqlparser::ast::OrderByKind;
1931
1932            let exprs = match &order_by.kind {
1933                OrderByKind::Expressions(exprs) => exprs,
1934                _ => {
1935                    return Err(Error::InvalidArgumentError(
1936                        "unsupported ORDER BY clause".into(),
1937                    ));
1938                }
1939            };
1940
1941            let mut sort_columns = Vec::new();
1942            for order_expr in exprs {
1943                if let SqlExpr::Identifier(ident) = &order_expr.expr
1944                    && let Ok(col_idx) = projected_schema.index_of(&ident.value)
1945                {
1946                    let options = arrow::compute::SortOptions {
1947                        descending: !order_expr.options.asc.unwrap_or(true),
1948                        nulls_first: order_expr.options.nulls_first.unwrap_or(false),
1949                    };
1950                    sort_columns.push(SortColumn {
1951                        values: Arc::clone(batch.column(col_idx)),
1952                        options: Some(options),
1953                    });
1954                }
1955            }
1956
1957            if !sort_columns.is_empty() {
1958                let indices = lexsort_to_indices(&sort_columns, None)
1959                    .map_err(|e| Error::Internal(format!("failed to sort: {}", e)))?;
1960
1961                use arrow::compute::take;
1962                let sorted_columns: Result<Vec<ArrayRef>, _> = batch
1963                    .columns()
1964                    .iter()
1965                    .map(|col| take(col.as_ref(), &indices, None))
1966                    .collect();
1967
1968                batch = RecordBatch::try_new(
1969                    Arc::clone(&projected_schema),
1970                    sorted_columns
1971                        .map_err(|e| Error::Internal(format!("failed to apply sort: {}", e)))?,
1972                )
1973                .map_err(|e| Error::Internal(format!("failed to create sorted batch: {}", e)))?;
1974            }
1975        }
1976
1977        let execution = SelectExecution::new_single_batch(
1978            table_name.clone(),
1979            Arc::clone(&projected_schema),
1980            batch,
1981        );
1982
1983        Ok(Some(RuntimeStatementResult::Select {
1984            table_name,
1985            schema: projected_schema,
1986            execution: Box::new(execution),
1987        }))
1988    }
1989
1990    fn handle_create_table_as(
1991        &self,
1992        display_name: String,
1993        _canonical_name: String,
1994        query: Query,
1995        if_not_exists: bool,
1996        or_replace: bool,
1997        namespace: Option<String>,
1998    ) -> SqlResult<RuntimeStatementResult<P>> {
1999        // Check if this is a SELECT from VALUES in a derived table
2000        // Pattern: SELECT * FROM (VALUES ...) alias(col1, col2, ...)
2001        if let SetExpr::Select(select) = query.body.as_ref()
2002            && let Some((rows, column_names)) = extract_values_from_derived_table(&select.from)?
2003        {
2004            // Convert VALUES rows to Arrow RecordBatches
2005            return self.handle_create_table_from_values(
2006                display_name,
2007                rows,
2008                column_names,
2009                if_not_exists,
2010                or_replace,
2011                namespace,
2012            );
2013        }
2014
2015        // Regular CTAS with SELECT from existing tables
2016        let select_plan = self.build_select_plan(query)?;
2017
2018        if select_plan.projections.is_empty() && select_plan.aggregates.is_empty() {
2019            return Err(Error::InvalidArgumentError(
2020                "CREATE TABLE AS SELECT requires at least one projected column".into(),
2021            ));
2022        }
2023
2024        let plan = CreateTablePlan {
2025            name: display_name,
2026            if_not_exists,
2027            or_replace,
2028            columns: Vec::new(),
2029            source: Some(CreateTableSource::Select {
2030                plan: Box::new(select_plan),
2031            }),
2032            namespace,
2033            foreign_keys: Vec::new(),
2034            multi_column_uniques: Vec::new(),
2035        };
2036        self.execute_plan_statement(PlanStatement::CreateTable(plan))
2037    }
2038
2039    fn handle_create_table_from_values(
2040        &self,
2041        display_name: String,
2042        rows: Vec<Vec<PlanValue>>,
2043        column_names: Vec<String>,
2044        if_not_exists: bool,
2045        or_replace: bool,
2046        namespace: Option<String>,
2047    ) -> SqlResult<RuntimeStatementResult<P>> {
2048        use arrow::array::{ArrayRef, Float64Builder, Int64Builder, StringBuilder};
2049        use arrow::datatypes::{DataType, Field, Schema};
2050        use arrow::record_batch::RecordBatch;
2051        use std::sync::Arc;
2052
2053        if rows.is_empty() {
2054            return Err(Error::InvalidArgumentError(
2055                "VALUES must have at least one row".into(),
2056            ));
2057        }
2058
2059        let num_cols = column_names.len();
2060
2061        // Infer schema from first row
2062        let first_row = &rows[0];
2063        if first_row.len() != num_cols {
2064            return Err(Error::InvalidArgumentError(
2065                "VALUES row column count mismatch".into(),
2066            ));
2067        }
2068
2069        let mut fields = Vec::with_capacity(num_cols);
2070        let mut column_types = Vec::with_capacity(num_cols);
2071
2072        for (idx, value) in first_row.iter().enumerate() {
2073            let (data_type, nullable) = match value {
2074                PlanValue::Integer(_) => (DataType::Int64, false),
2075                PlanValue::Float(_) => (DataType::Float64, false),
2076                PlanValue::String(_) => (DataType::Utf8, false),
2077                PlanValue::Null => (DataType::Utf8, true), // Default NULL to string type
2078                _ => {
2079                    return Err(Error::InvalidArgumentError(format!(
2080                        "unsupported value type in VALUES for column '{}'",
2081                        column_names.get(idx).unwrap_or(&format!("column{}", idx))
2082                    )));
2083                }
2084            };
2085
2086            column_types.push(data_type.clone());
2087            fields.push(Field::new(&column_names[idx], data_type, nullable));
2088        }
2089
2090        let schema = Arc::new(Schema::new(fields));
2091
2092        // Build Arrow arrays for each column
2093        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(num_cols);
2094
2095        for col_idx in 0..num_cols {
2096            let col_type = &column_types[col_idx];
2097
2098            match col_type {
2099                DataType::Int64 => {
2100                    let mut builder = Int64Builder::with_capacity(rows.len());
2101                    for row in &rows {
2102                        match &row[col_idx] {
2103                            PlanValue::Integer(v) => builder.append_value(*v),
2104                            PlanValue::Null => builder.append_null(),
2105                            other => {
2106                                return Err(Error::InvalidArgumentError(format!(
2107                                    "type mismatch in VALUES: expected Integer, got {:?}",
2108                                    other
2109                                )));
2110                            }
2111                        }
2112                    }
2113                    arrays.push(Arc::new(builder.finish()) as ArrayRef);
2114                }
2115                DataType::Float64 => {
2116                    let mut builder = Float64Builder::with_capacity(rows.len());
2117                    for row in &rows {
2118                        match &row[col_idx] {
2119                            PlanValue::Float(v) => builder.append_value(*v),
2120                            PlanValue::Null => builder.append_null(),
2121                            other => {
2122                                return Err(Error::InvalidArgumentError(format!(
2123                                    "type mismatch in VALUES: expected Float, got {:?}",
2124                                    other
2125                                )));
2126                            }
2127                        }
2128                    }
2129                    arrays.push(Arc::new(builder.finish()) as ArrayRef);
2130                }
2131                DataType::Utf8 => {
2132                    let mut builder = StringBuilder::with_capacity(rows.len(), 1024);
2133                    for row in &rows {
2134                        match &row[col_idx] {
2135                            PlanValue::String(v) => builder.append_value(v),
2136                            PlanValue::Null => builder.append_null(),
2137                            other => {
2138                                return Err(Error::InvalidArgumentError(format!(
2139                                    "type mismatch in VALUES: expected String, got {:?}",
2140                                    other
2141                                )));
2142                            }
2143                        }
2144                    }
2145                    arrays.push(Arc::new(builder.finish()) as ArrayRef);
2146                }
2147                other => {
2148                    return Err(Error::InvalidArgumentError(format!(
2149                        "unsupported column type in VALUES: {:?}",
2150                        other
2151                    )));
2152                }
2153            }
2154        }
2155
2156        let batch = RecordBatch::try_new(Arc::clone(&schema), arrays).map_err(|e| {
2157            Error::Internal(format!("failed to create RecordBatch from VALUES: {}", e))
2158        })?;
2159
2160        let plan = CreateTablePlan {
2161            name: display_name.clone(),
2162            if_not_exists,
2163            or_replace,
2164            columns: Vec::new(),
2165            source: Some(CreateTableSource::Batches {
2166                schema: Arc::clone(&schema),
2167                batches: vec![batch],
2168            }),
2169            namespace,
2170            foreign_keys: Vec::new(),
2171            multi_column_uniques: Vec::new(),
2172        };
2173
2174        self.execute_plan_statement(PlanStatement::CreateTable(plan))
2175    }
2176
2177    fn handle_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<RuntimeStatementResult<P>> {
2178        let table_name_debug =
2179            Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
2180        tracing::trace!(
2181            "DEBUG SQL handle_insert called for table={}",
2182            table_name_debug
2183        );
2184        if !self.engine.session().has_active_transaction()
2185            && self.is_table_marked_dropped(&table_name_debug)?
2186        {
2187            return Err(Error::TransactionContextError(
2188                DROPPED_TABLE_TRANSACTION_ERR.into(),
2189            ));
2190        }
2191        if stmt.replace_into || stmt.ignore || stmt.or.is_some() {
2192            return Err(Error::InvalidArgumentError(
2193                "non-standard INSERT forms are not supported".into(),
2194            ));
2195        }
2196        if stmt.overwrite {
2197            return Err(Error::InvalidArgumentError(
2198                "INSERT OVERWRITE is not supported".into(),
2199            ));
2200        }
2201        if !stmt.assignments.is_empty() {
2202            return Err(Error::InvalidArgumentError(
2203                "INSERT ... SET is not supported".into(),
2204            ));
2205        }
2206        if stmt.partitioned.is_some() || !stmt.after_columns.is_empty() {
2207            return Err(Error::InvalidArgumentError(
2208                "partitioned INSERT is not supported".into(),
2209            ));
2210        }
2211        if stmt.returning.is_some() {
2212            return Err(Error::InvalidArgumentError(
2213                "INSERT ... RETURNING is not supported".into(),
2214            ));
2215        }
2216        if stmt.format_clause.is_some() || stmt.settings.is_some() {
2217            return Err(Error::InvalidArgumentError(
2218                "INSERT with FORMAT or SETTINGS is not supported".into(),
2219            ));
2220        }
2221
2222        let (display_name, _canonical_name) = match &stmt.table {
2223            TableObject::TableName(name) => canonical_object_name(name)?,
2224            _ => {
2225                return Err(Error::InvalidArgumentError(
2226                    "INSERT requires a plain table name".into(),
2227                ));
2228            }
2229        };
2230
2231        let columns: Vec<String> = stmt
2232            .columns
2233            .iter()
2234            .map(|ident| ident.value.clone())
2235            .collect();
2236        let source_expr = stmt
2237            .source
2238            .as_ref()
2239            .ok_or_else(|| Error::InvalidArgumentError("INSERT requires a VALUES clause".into()))?;
2240        validate_simple_query(source_expr)?;
2241
2242        let insert_source = match source_expr.body.as_ref() {
2243            SetExpr::Values(values) => {
2244                if values.rows.is_empty() {
2245                    return Err(Error::InvalidArgumentError(
2246                        "INSERT VALUES list must contain at least one row".into(),
2247                    ));
2248                }
2249                let mut rows: Vec<Vec<SqlValue>> = Vec::with_capacity(values.rows.len());
2250                for row in &values.rows {
2251                    let mut converted = Vec::with_capacity(row.len());
2252                    for expr in row {
2253                        converted.push(SqlValue::try_from_expr(expr)?);
2254                    }
2255                    rows.push(converted);
2256                }
2257                InsertSource::Rows(
2258                    rows.into_iter()
2259                        .map(|row| row.into_iter().map(PlanValue::from).collect())
2260                        .collect(),
2261                )
2262            }
2263            SetExpr::Select(select) => {
2264                if let Some(rows) = extract_constant_select_rows(select.as_ref())? {
2265                    InsertSource::Rows(rows)
2266                } else if let Some(range_rows) = extract_rows_from_range(select.as_ref())? {
2267                    InsertSource::Rows(range_rows.into_rows())
2268                } else {
2269                    let select_plan = self.build_select_plan((**source_expr).clone())?;
2270                    InsertSource::Select {
2271                        plan: Box::new(select_plan),
2272                    }
2273                }
2274            }
2275            _ => {
2276                return Err(Error::InvalidArgumentError(
2277                    "unsupported INSERT source".into(),
2278                ));
2279            }
2280        };
2281
2282        let plan = InsertPlan {
2283            table: display_name.clone(),
2284            columns,
2285            source: insert_source,
2286        };
2287        tracing::trace!(
2288            "DEBUG SQL handle_insert: about to execute insert for table={}",
2289            display_name
2290        );
2291        self.execute_plan_statement(PlanStatement::Insert(plan))
2292    }
2293
2294    fn handle_update(
2295        &self,
2296        table: TableWithJoins,
2297        assignments: Vec<Assignment>,
2298        from: Option<UpdateTableFromKind>,
2299        selection: Option<SqlExpr>,
2300        returning: Option<Vec<SelectItem>>,
2301    ) -> SqlResult<RuntimeStatementResult<P>> {
2302        if from.is_some() {
2303            return Err(Error::InvalidArgumentError(
2304                "UPDATE ... FROM is not supported yet".into(),
2305            ));
2306        }
2307        if returning.is_some() {
2308            return Err(Error::InvalidArgumentError(
2309                "UPDATE ... RETURNING is not supported".into(),
2310            ));
2311        }
2312        if assignments.is_empty() {
2313            return Err(Error::InvalidArgumentError(
2314                "UPDATE requires at least one assignment".into(),
2315            ));
2316        }
2317
2318        let (display_name, canonical_name) = extract_single_table(std::slice::from_ref(&table))?;
2319
2320        if !self.engine.session().has_active_transaction()
2321            && self
2322                .engine
2323                .context()
2324                .is_table_marked_dropped(&canonical_name)
2325        {
2326            return Err(Error::TransactionContextError(
2327                DROPPED_TABLE_TRANSACTION_ERR.into(),
2328            ));
2329        }
2330
2331        let catalog = self.engine.context().table_catalog();
2332        let resolver = catalog.identifier_resolver();
2333        let table_id = catalog.table_id(&canonical_name);
2334
2335        let mut column_assignments = Vec::with_capacity(assignments.len());
2336        let mut seen: HashMap<String, ()> = HashMap::new();
2337        for assignment in assignments {
2338            let column_name = resolve_assignment_column_name(&assignment.target)?;
2339            let normalized = column_name.to_ascii_lowercase();
2340            if seen.insert(normalized, ()).is_some() {
2341                return Err(Error::InvalidArgumentError(format!(
2342                    "duplicate column '{}' in UPDATE assignments",
2343                    column_name
2344                )));
2345            }
2346            let value = match SqlValue::try_from_expr(&assignment.value) {
2347                Ok(literal) => AssignmentValue::Literal(PlanValue::from(literal)),
2348                Err(Error::InvalidArgumentError(msg))
2349                    if msg.contains("unsupported literal expression") =>
2350                {
2351                    let translated = translate_scalar_with_context(
2352                        &resolver,
2353                        IdentifierContext::new(table_id),
2354                        &assignment.value,
2355                    )?;
2356                    AssignmentValue::Expression(translated)
2357                }
2358                Err(err) => return Err(err),
2359            };
2360            column_assignments.push(ColumnAssignment {
2361                column: column_name,
2362                value,
2363            });
2364        }
2365
2366        let filter = match selection {
2367            Some(expr) => {
2368                let materialized_expr = self.materialize_in_subquery(expr)?;
2369                Some(translate_condition_with_context(
2370                    &resolver,
2371                    IdentifierContext::new(table_id),
2372                    &materialized_expr,
2373                )?)
2374            }
2375            None => None,
2376        };
2377
2378        let plan = UpdatePlan {
2379            table: display_name.clone(),
2380            assignments: column_assignments,
2381            filter,
2382        };
2383        self.execute_plan_statement(PlanStatement::Update(plan))
2384    }
2385
2386    #[allow(clippy::collapsible_if)]
2387    fn handle_delete(&self, delete: Delete) -> SqlResult<RuntimeStatementResult<P>> {
2388        let Delete {
2389            tables,
2390            from,
2391            using,
2392            selection,
2393            returning,
2394            order_by,
2395            limit,
2396        } = delete;
2397
2398        if !tables.is_empty() {
2399            return Err(Error::InvalidArgumentError(
2400                "multi-table DELETE is not supported yet".into(),
2401            ));
2402        }
2403        if let Some(using_tables) = using {
2404            if !using_tables.is_empty() {
2405                return Err(Error::InvalidArgumentError(
2406                    "DELETE ... USING is not supported yet".into(),
2407                ));
2408            }
2409        }
2410        if returning.is_some() {
2411            return Err(Error::InvalidArgumentError(
2412                "DELETE ... RETURNING is not supported".into(),
2413            ));
2414        }
2415        if !order_by.is_empty() {
2416            return Err(Error::InvalidArgumentError(
2417                "DELETE ... ORDER BY is not supported yet".into(),
2418            ));
2419        }
2420        if limit.is_some() {
2421            return Err(Error::InvalidArgumentError(
2422                "DELETE ... LIMIT is not supported yet".into(),
2423            ));
2424        }
2425
2426        let from_tables = match from {
2427            FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
2428        };
2429        let (display_name, canonical_name) = extract_single_table(&from_tables)?;
2430
2431        if !self.engine.session().has_active_transaction()
2432            && self
2433                .engine
2434                .context()
2435                .is_table_marked_dropped(&canonical_name)
2436        {
2437            return Err(Error::TransactionContextError(
2438                DROPPED_TABLE_TRANSACTION_ERR.into(),
2439            ));
2440        }
2441
2442        let catalog = self.engine.context().table_catalog();
2443        let resolver = catalog.identifier_resolver();
2444        let table_id = catalog.table_id(&canonical_name);
2445
2446        let filter = selection
2447            .map(|expr| {
2448                let materialized_expr = self.materialize_in_subquery(expr)?;
2449                translate_condition_with_context(
2450                    &resolver,
2451                    IdentifierContext::new(table_id),
2452                    &materialized_expr,
2453                )
2454            })
2455            .transpose()?;
2456
2457        let plan = DeletePlan {
2458            table: display_name.clone(),
2459            filter,
2460        };
2461        self.execute_plan_statement(PlanStatement::Delete(plan))
2462    }
2463
2464    fn handle_truncate(
2465        &self,
2466        table_names: &[sqlparser::ast::TruncateTableTarget],
2467        partitions: &Option<Vec<SqlExpr>>,
2468        _table: bool, // boolean field in sqlparser, not the table name
2469        identity: &Option<sqlparser::ast::TruncateIdentityOption>,
2470        cascade: Option<sqlparser::ast::CascadeOption>,
2471        on_cluster: &Option<Ident>,
2472    ) -> SqlResult<RuntimeStatementResult<P>> {
2473        // Validate unsupported features
2474        if table_names.len() > 1 {
2475            return Err(Error::InvalidArgumentError(
2476                "TRUNCATE with multiple tables is not supported yet".into(),
2477            ));
2478        }
2479        if partitions.is_some() {
2480            return Err(Error::InvalidArgumentError(
2481                "TRUNCATE ... PARTITION is not supported".into(),
2482            ));
2483        }
2484        if identity.is_some() {
2485            return Err(Error::InvalidArgumentError(
2486                "TRUNCATE ... RESTART/CONTINUE IDENTITY is not supported".into(),
2487            ));
2488        }
2489        use sqlparser::ast::CascadeOption;
2490        if matches!(cascade, Some(CascadeOption::Cascade)) {
2491            return Err(Error::InvalidArgumentError(
2492                "TRUNCATE ... CASCADE is not supported".into(),
2493            ));
2494        }
2495        if on_cluster.is_some() {
2496            return Err(Error::InvalidArgumentError(
2497                "TRUNCATE ... ON CLUSTER is not supported".into(),
2498            ));
2499        }
2500
2501        // Extract table name from table_names
2502        let table_name = if let Some(target) = table_names.first() {
2503            // TruncateTableTarget has a name field
2504            let table_obj = &target.name;
2505            let display_name = table_obj.to_string();
2506            let canonical_name = display_name.to_ascii_lowercase();
2507
2508            // Check if table is dropped in transaction
2509            if !self.engine.session().has_active_transaction()
2510                && self
2511                    .engine
2512                    .context()
2513                    .is_table_marked_dropped(&canonical_name)
2514            {
2515                return Err(Error::TransactionContextError(
2516                    DROPPED_TABLE_TRANSACTION_ERR.into(),
2517                ));
2518            }
2519
2520            display_name
2521        } else {
2522            return Err(Error::InvalidArgumentError(
2523                "TRUNCATE requires a table name".into(),
2524            ));
2525        };
2526
2527        let plan = TruncatePlan {
2528            table: table_name.clone(),
2529        };
2530        self.execute_plan_statement(PlanStatement::Truncate(plan))
2531    }
2532
2533    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors SQL grammar; keep grouped until command builder is introduced.
2534    fn handle_drop(
2535        &self,
2536        object_type: ObjectType,
2537        if_exists: bool,
2538        names: Vec<ObjectName>,
2539        cascade: bool,
2540        restrict: bool,
2541        purge: bool,
2542        temporary: bool,
2543    ) -> SqlResult<RuntimeStatementResult<P>> {
2544        if purge || temporary {
2545            return Err(Error::InvalidArgumentError(
2546                "DROP purge/temporary options are not supported".into(),
2547            ));
2548        }
2549
2550        match object_type {
2551            ObjectType::Table => {
2552                if cascade || restrict {
2553                    return Err(Error::InvalidArgumentError(
2554                        "DROP TABLE CASCADE/RESTRICT is not supported".into(),
2555                    ));
2556                }
2557
2558                for name in names {
2559                    let table_name = Self::object_name_to_string(&name)?;
2560                    let mut plan = llkv_plan::DropTablePlan::new(table_name.clone());
2561                    plan.if_exists = if_exists;
2562
2563                    self.execute_plan_statement(llkv_plan::PlanStatement::DropTable(plan))
2564                        .map_err(|err| Self::map_table_error(&table_name, err))?;
2565                }
2566
2567                Ok(RuntimeStatementResult::NoOp)
2568            }
2569            ObjectType::Index => {
2570                if cascade || restrict {
2571                    return Err(Error::InvalidArgumentError(
2572                        "DROP INDEX CASCADE/RESTRICT is not supported".into(),
2573                    ));
2574                }
2575
2576                for name in names {
2577                    let index_name = Self::object_name_to_string(&name)?;
2578                    let plan = llkv_plan::DropIndexPlan::new(index_name).if_exists(if_exists);
2579                    self.execute_plan_statement(llkv_plan::PlanStatement::DropIndex(plan))?;
2580                }
2581
2582                Ok(RuntimeStatementResult::NoOp)
2583            }
2584            ObjectType::Schema => {
2585                if restrict {
2586                    return Err(Error::InvalidArgumentError(
2587                        "DROP SCHEMA RESTRICT is not supported".into(),
2588                    ));
2589                }
2590
2591                let catalog = self.engine.context().table_catalog();
2592
2593                for name in names {
2594                    let (display_name, canonical_name) = canonical_object_name(&name)?;
2595
2596                    if !catalog.schema_exists(&canonical_name) {
2597                        if if_exists {
2598                            continue;
2599                        }
2600                        return Err(Error::CatalogError(format!(
2601                            "Schema '{}' does not exist",
2602                            display_name
2603                        )));
2604                    }
2605
2606                    if cascade {
2607                        // Drop all tables in this schema
2608                        let all_tables = catalog.table_names();
2609                        let schema_prefix = format!("{}.", canonical_name);
2610
2611                        for table in all_tables {
2612                            if table.to_ascii_lowercase().starts_with(&schema_prefix) {
2613                                let mut plan = llkv_plan::DropTablePlan::new(table.clone());
2614                                plan.if_exists = false;
2615                                self.execute_plan_statement(llkv_plan::PlanStatement::DropTable(
2616                                    plan,
2617                                ))?;
2618                            }
2619                        }
2620                    } else {
2621                        // Check if schema has any tables
2622                        let all_tables = catalog.table_names();
2623                        let schema_prefix = format!("{}.", canonical_name);
2624                        let has_tables = all_tables
2625                            .iter()
2626                            .any(|t| t.to_ascii_lowercase().starts_with(&schema_prefix));
2627
2628                        if has_tables {
2629                            return Err(Error::CatalogError(format!(
2630                                "Schema '{}' is not empty. Use CASCADE to drop schema and all its tables",
2631                                display_name
2632                            )));
2633                        }
2634                    }
2635
2636                    // Drop the schema
2637                    if !catalog.unregister_schema(&canonical_name) && !if_exists {
2638                        return Err(Error::CatalogError(format!(
2639                            "Schema '{}' does not exist",
2640                            display_name
2641                        )));
2642                    }
2643                }
2644
2645                Ok(RuntimeStatementResult::NoOp)
2646            }
2647            _ => Err(Error::InvalidArgumentError(format!(
2648                "DROP {} is not supported",
2649                object_type
2650            ))),
2651        }
2652    }
2653
2654    fn handle_alter_table(
2655        &self,
2656        name: ObjectName,
2657        if_exists: bool,
2658        only: bool,
2659        operations: Vec<AlterTableOperation>,
2660    ) -> SqlResult<RuntimeStatementResult<P>> {
2661        if only {
2662            return Err(Error::InvalidArgumentError(
2663                "ALTER TABLE ONLY is not supported yet".into(),
2664            ));
2665        }
2666
2667        if operations.is_empty() {
2668            return Ok(RuntimeStatementResult::NoOp);
2669        }
2670
2671        if operations.len() != 1 {
2672            return Err(Error::InvalidArgumentError(
2673                "ALTER TABLE currently supports exactly one operation".into(),
2674            ));
2675        }
2676
2677        let operation = operations.into_iter().next().expect("checked length");
2678        match operation {
2679            AlterTableOperation::RenameTable { table_name } => {
2680                let new_name = table_name.to_string();
2681                self.handle_alter_table_rename(name, new_name, if_exists)
2682            }
2683            AlterTableOperation::RenameColumn {
2684                old_column_name,
2685                new_column_name,
2686            } => {
2687                let plan = llkv_plan::AlterTablePlan {
2688                    table_name: name.to_string(),
2689                    if_exists,
2690                    operation: llkv_plan::AlterTableOperation::RenameColumn {
2691                        old_column_name: old_column_name.to_string(),
2692                        new_column_name: new_column_name.to_string(),
2693                    },
2694                };
2695                self.execute_plan_statement(PlanStatement::AlterTable(plan))
2696            }
2697            AlterTableOperation::AlterColumn { column_name, op } => {
2698                // Only support SET DATA TYPE for now
2699                if let AlterColumnOperation::SetDataType {
2700                    data_type,
2701                    using,
2702                    had_set: _,
2703                } = op
2704                {
2705                    if using.is_some() {
2706                        return Err(Error::InvalidArgumentError(
2707                            "ALTER COLUMN SET DATA TYPE USING clause is not yet supported".into(),
2708                        ));
2709                    }
2710
2711                    let plan = llkv_plan::AlterTablePlan {
2712                        table_name: name.to_string(),
2713                        if_exists,
2714                        operation: llkv_plan::AlterTableOperation::SetColumnDataType {
2715                            column_name: column_name.to_string(),
2716                            new_data_type: data_type.to_string(),
2717                        },
2718                    };
2719                    self.execute_plan_statement(PlanStatement::AlterTable(plan))
2720                } else {
2721                    Err(Error::InvalidArgumentError(format!(
2722                        "unsupported ALTER COLUMN operation: {:?}",
2723                        op
2724                    )))
2725                }
2726            }
2727            AlterTableOperation::DropColumn {
2728                has_column_keyword: _,
2729                column_names,
2730                if_exists: column_if_exists,
2731                drop_behavior,
2732            } => {
2733                if column_names.len() != 1 {
2734                    return Err(Error::InvalidArgumentError(
2735                        "DROP COLUMN currently supports dropping one column at a time".into(),
2736                    ));
2737                }
2738
2739                let column_name = column_names.into_iter().next().unwrap().to_string();
2740                let cascade = matches!(drop_behavior, Some(sqlparser::ast::DropBehavior::Cascade));
2741
2742                let plan = llkv_plan::AlterTablePlan {
2743                    table_name: name.to_string(),
2744                    if_exists,
2745                    operation: llkv_plan::AlterTableOperation::DropColumn {
2746                        column_name,
2747                        if_exists: column_if_exists,
2748                        cascade,
2749                    },
2750                };
2751                self.execute_plan_statement(PlanStatement::AlterTable(plan))
2752            }
2753            other => Err(Error::InvalidArgumentError(format!(
2754                "unsupported ALTER TABLE operation: {:?}",
2755                other
2756            ))),
2757        }
2758    }
2759
2760    fn handle_alter_table_rename(
2761        &self,
2762        original_name: ObjectName,
2763        new_table_name: String,
2764        if_exists: bool,
2765    ) -> SqlResult<RuntimeStatementResult<P>> {
2766        let (schema_opt, table_name) = parse_schema_qualified_name(&original_name)?;
2767
2768        let new_table_name_clean = new_table_name.trim();
2769
2770        if new_table_name_clean.is_empty() {
2771            return Err(Error::InvalidArgumentError(
2772                "ALTER TABLE RENAME requires a non-empty table name".into(),
2773            ));
2774        }
2775
2776        let (raw_new_schema_opt, raw_new_table) =
2777            if let Some((schema_part, table_part)) = new_table_name_clean.split_once('.') {
2778                (
2779                    Some(schema_part.trim().to_string()),
2780                    table_part.trim().to_string(),
2781                )
2782            } else {
2783                (None, new_table_name_clean.to_string())
2784            };
2785
2786        if schema_opt.is_none() && raw_new_schema_opt.is_some() {
2787            return Err(Error::InvalidArgumentError(
2788                "ALTER TABLE RENAME cannot add a schema qualifier".into(),
2789            ));
2790        }
2791
2792        let new_table_trimmed = raw_new_table.trim_matches('"');
2793        if new_table_trimmed.is_empty() {
2794            return Err(Error::InvalidArgumentError(
2795                "ALTER TABLE RENAME requires a non-empty table name".into(),
2796            ));
2797        }
2798
2799        if let (Some(existing_schema), Some(new_schema_raw)) =
2800            (schema_opt.as_ref(), raw_new_schema_opt.as_ref())
2801        {
2802            let new_schema_trimmed = new_schema_raw.trim_matches('"');
2803            if !existing_schema.eq_ignore_ascii_case(new_schema_trimmed) {
2804                return Err(Error::InvalidArgumentError(
2805                    "ALTER TABLE RENAME cannot change table schema".into(),
2806                ));
2807            }
2808        }
2809
2810        let new_table_display = raw_new_table;
2811        let new_schema_opt = raw_new_schema_opt;
2812
2813        fn join_schema_table(schema: &str, table: &str) -> String {
2814            let mut qualified = String::with_capacity(schema.len() + table.len() + 1);
2815            qualified.push_str(schema);
2816            qualified.push('.');
2817            qualified.push_str(table);
2818            qualified
2819        }
2820
2821        let current_display = schema_opt
2822            .as_ref()
2823            .map(|schema| join_schema_table(schema, &table_name))
2824            .unwrap_or_else(|| table_name.clone());
2825
2826        let new_display = if let Some(new_schema_raw) = new_schema_opt.clone() {
2827            join_schema_table(&new_schema_raw, &new_table_display)
2828        } else if let Some(schema) = schema_opt.as_ref() {
2829            join_schema_table(schema, &new_table_display)
2830        } else {
2831            new_table_display.clone()
2832        };
2833
2834        let plan = RenameTablePlan::new(&current_display, &new_display).if_exists(if_exists);
2835
2836        match CatalogDdl::rename_table(self.engine.session(), plan) {
2837            Ok(()) => Ok(RuntimeStatementResult::NoOp),
2838            Err(err) => Err(Self::map_table_error(&current_display, err)),
2839        }
2840    }
2841
2842    /// Materialize IN (SELECT ...) subqueries by executing them and converting to IN lists.
2843    ///
2844    /// This preprocesses SQL expressions to execute subqueries and replace them with
2845    /// their materialized results before translation to the execution plan.
2846    ///
2847    /// Uses iterative traversal with an explicit work stack to handle deeply nested
2848    /// expressions without stack overflow.
2849    fn materialize_in_subquery(&self, root_expr: SqlExpr) -> SqlResult<SqlExpr> {
2850        // Stack-based iterative traversal to avoid recursion
2851        enum WorkItem {
2852            Process(Box<SqlExpr>),
2853            BuildBinaryOp {
2854                op: BinaryOperator,
2855                left: Box<SqlExpr>,
2856                right_done: bool,
2857            },
2858            BuildUnaryOp {
2859                op: UnaryOperator,
2860            },
2861            BuildNested,
2862            BuildIsNull,
2863            BuildIsNotNull,
2864            FinishBetween {
2865                negated: bool,
2866            },
2867        }
2868
2869        let mut work_stack: Vec<WorkItem> = vec![WorkItem::Process(Box::new(root_expr))];
2870        let mut result_stack: Vec<SqlExpr> = Vec::new();
2871
2872        while let Some(item) = work_stack.pop() {
2873            match item {
2874                WorkItem::Process(expr) => {
2875                    match *expr {
2876                        SqlExpr::InSubquery {
2877                            expr: left_expr,
2878                            subquery,
2879                            negated,
2880                        } => {
2881                            // Execute the subquery
2882                            let result = self.handle_query(*subquery)?;
2883
2884                            // Extract values from first column
2885                            let values = match result {
2886                                RuntimeStatementResult::Select { execution, .. } => {
2887                                    let batches = execution.collect()?;
2888                                    let mut collected_values = Vec::new();
2889
2890                                    for batch in batches {
2891                                        if batch.num_columns() == 0 {
2892                                            continue;
2893                                        }
2894                                        let column = batch.column(0);
2895
2896                                        for row_idx in 0..column.len() {
2897                                            use arrow::datatypes::DataType;
2898                                            let value = if column.is_null(row_idx) {
2899                                                Value::Null
2900                                            } else {
2901                                                match column.data_type() {
2902                                                    DataType::Int64 => {
2903                                                        let arr = column
2904                                                        .as_any()
2905                                                        .downcast_ref::<arrow::array::Int64Array>()
2906                                                        .unwrap();
2907                                                        Value::Number(
2908                                                            arr.value(row_idx).to_string(),
2909                                                            false,
2910                                                        )
2911                                                    }
2912                                                    DataType::Float64 => {
2913                                                        let arr = column
2914                                                        .as_any()
2915                                                        .downcast_ref::<arrow::array::Float64Array>()
2916                                                        .unwrap();
2917                                                        Value::Number(
2918                                                            arr.value(row_idx).to_string(),
2919                                                            false,
2920                                                        )
2921                                                    }
2922                                                    DataType::Utf8 => {
2923                                                        let arr = column
2924                                                        .as_any()
2925                                                        .downcast_ref::<arrow::array::StringArray>()
2926                                                        .unwrap();
2927                                                        Value::SingleQuotedString(
2928                                                            arr.value(row_idx).to_string(),
2929                                                        )
2930                                                    }
2931                                                    DataType::Boolean => {
2932                                                        let arr = column
2933                                                        .as_any()
2934                                                        .downcast_ref::<arrow::array::BooleanArray>()
2935                                                        .unwrap();
2936                                                        Value::Boolean(arr.value(row_idx))
2937                                                    }
2938                                                    other => {
2939                                                        return Err(Error::InvalidArgumentError(
2940                                                            format!(
2941                                                                "unsupported data type in IN subquery: {other:?}"
2942                                                            ),
2943                                                        ));
2944                                                    }
2945                                                }
2946                                            };
2947                                            collected_values.push(ValueWithSpan {
2948                                                value,
2949                                                span: Span::empty(),
2950                                            });
2951                                        }
2952                                    }
2953
2954                                    collected_values
2955                                }
2956                                _ => {
2957                                    return Err(Error::InvalidArgumentError(
2958                                        "IN subquery must be a SELECT statement".into(),
2959                                    ));
2960                                }
2961                            };
2962
2963                            // Convert to IN list with materialized values
2964                            result_stack.push(SqlExpr::InList {
2965                                expr: left_expr,
2966                                list: values.into_iter().map(SqlExpr::Value).collect(),
2967                                negated,
2968                            });
2969                        }
2970                        SqlExpr::BinaryOp { left, op, right } => {
2971                            // Push builder, then schedule the left operand followed by the right.
2972                            // Because the work stack is LIFO, the right-hand side executes first,
2973                            // leaving the left result on top for the builder to consume without
2974                            // swapping operand order.
2975                            work_stack.push(WorkItem::BuildBinaryOp {
2976                                op,
2977                                left: left.clone(),
2978                                right_done: false,
2979                            });
2980                            // Evaluate the right-hand side first so the left result remains on top
2981                            // of the result stack when the builder consumes it. This preserves the
2982                            // original operand ordering when reconstructing the expression tree.
2983                            work_stack.push(WorkItem::Process(left));
2984                            work_stack.push(WorkItem::Process(right));
2985                        }
2986                        SqlExpr::UnaryOp { op, expr } => {
2987                            work_stack.push(WorkItem::BuildUnaryOp { op });
2988                            work_stack.push(WorkItem::Process(expr));
2989                        }
2990                        SqlExpr::Nested(inner) => {
2991                            work_stack.push(WorkItem::BuildNested);
2992                            work_stack.push(WorkItem::Process(inner));
2993                        }
2994                        SqlExpr::IsNull(inner) => {
2995                            work_stack.push(WorkItem::BuildIsNull);
2996                            work_stack.push(WorkItem::Process(inner));
2997                        }
2998                        SqlExpr::IsNotNull(inner) => {
2999                            work_stack.push(WorkItem::BuildIsNotNull);
3000                            work_stack.push(WorkItem::Process(inner));
3001                        }
3002                        SqlExpr::Between {
3003                            expr,
3004                            negated,
3005                            low,
3006                            high,
3007                        } => {
3008                            work_stack.push(WorkItem::FinishBetween { negated });
3009                            work_stack.push(WorkItem::Process(high));
3010                            work_stack.push(WorkItem::Process(low));
3011                            work_stack.push(WorkItem::Process(expr));
3012                        }
3013                        // All other expressions: push as-is to result stack
3014                        other => {
3015                            result_stack.push(other);
3016                        }
3017                    }
3018                }
3019                WorkItem::BuildBinaryOp {
3020                    op,
3021                    left,
3022                    right_done,
3023                } => {
3024                    if !right_done {
3025                        // Left done, now mark that right will be done next
3026                        let left_result = result_stack.pop().unwrap();
3027                        work_stack.push(WorkItem::BuildBinaryOp {
3028                            op,
3029                            left: Box::new(left_result),
3030                            right_done: true,
3031                        });
3032                    } else {
3033                        // Both done, build the BinaryOp
3034                        let right_result = result_stack.pop().unwrap();
3035                        let left_result = *left;
3036                        result_stack.push(SqlExpr::BinaryOp {
3037                            left: Box::new(left_result),
3038                            op,
3039                            right: Box::new(right_result),
3040                        });
3041                    }
3042                }
3043                WorkItem::BuildUnaryOp { op } => {
3044                    let inner = result_stack.pop().unwrap();
3045                    result_stack.push(SqlExpr::UnaryOp {
3046                        op,
3047                        expr: Box::new(inner),
3048                    });
3049                }
3050                WorkItem::BuildNested => {
3051                    let inner = result_stack.pop().unwrap();
3052                    result_stack.push(SqlExpr::Nested(Box::new(inner)));
3053                }
3054                WorkItem::BuildIsNull => {
3055                    let inner = result_stack.pop().unwrap();
3056                    result_stack.push(SqlExpr::IsNull(Box::new(inner)));
3057                }
3058                WorkItem::BuildIsNotNull => {
3059                    let inner = result_stack.pop().unwrap();
3060                    result_stack.push(SqlExpr::IsNotNull(Box::new(inner)));
3061                }
3062                WorkItem::FinishBetween { negated } => {
3063                    let high_result = result_stack.pop().unwrap();
3064                    let low_result = result_stack.pop().unwrap();
3065                    let expr_result = result_stack.pop().unwrap();
3066                    result_stack.push(SqlExpr::Between {
3067                        expr: Box::new(expr_result),
3068                        negated,
3069                        low: Box::new(low_result),
3070                        high: Box::new(high_result),
3071                    });
3072                }
3073            }
3074        }
3075
3076        // Final result should be the only item on the result stack
3077        Ok(result_stack
3078            .pop()
3079            .expect("result stack should have exactly one item"))
3080    }
3081
3082    fn handle_query(&self, query: Query) -> SqlResult<RuntimeStatementResult<P>> {
3083        // Check for pragma_table_info() table function first
3084        if let Some(result) = self.try_handle_pragma_table_info(&query)? {
3085            return Ok(result);
3086        }
3087
3088        let select_plan = self.build_select_plan(query)?;
3089        self.execute_plan_statement(PlanStatement::Select(select_plan))
3090    }
3091
3092    fn build_select_plan(&self, query: Query) -> SqlResult<SelectPlan> {
3093        if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
3094            return Err(Error::TransactionContextError(
3095                "TransactionContext Error: transaction is aborted".into(),
3096            ));
3097        }
3098
3099        validate_simple_query(&query)?;
3100        let catalog = self.engine.context().table_catalog();
3101        let resolver = catalog.identifier_resolver();
3102
3103        let (mut select_plan, select_context) = match query.body.as_ref() {
3104            SetExpr::Select(select) => self.translate_select(select.as_ref(), &resolver)?,
3105            other => {
3106                return Err(Error::InvalidArgumentError(format!(
3107                    "unsupported query expression: {other:?}"
3108                )));
3109            }
3110        };
3111        if let Some(order_by) = &query.order_by {
3112            if !select_plan.aggregates.is_empty() {
3113                return Err(Error::InvalidArgumentError(
3114                    "ORDER BY is not supported for aggregate queries".into(),
3115                ));
3116            }
3117            let order_plan = self.translate_order_by(&resolver, select_context, order_by)?;
3118            select_plan = select_plan.with_order_by(order_plan);
3119        }
3120        Ok(select_plan)
3121    }
3122
3123    fn translate_select(
3124        &self,
3125        select: &Select,
3126        resolver: &IdentifierResolver<'_>,
3127    ) -> SqlResult<(SelectPlan, IdentifierContext)> {
3128        let distinct = match &select.distinct {
3129            None => false,
3130            Some(Distinct::Distinct) => true,
3131            Some(Distinct::On(_)) => {
3132                return Err(Error::InvalidArgumentError(
3133                    "SELECT DISTINCT ON is not supported".into(),
3134                ));
3135            }
3136        };
3137        if select.top.is_some() {
3138            return Err(Error::InvalidArgumentError(
3139                "SELECT TOP is not supported".into(),
3140            ));
3141        }
3142        if select.exclude.is_some() {
3143            return Err(Error::InvalidArgumentError(
3144                "SELECT EXCLUDE is not supported".into(),
3145            ));
3146        }
3147        if select.into.is_some() {
3148            return Err(Error::InvalidArgumentError(
3149                "SELECT INTO is not supported".into(),
3150            ));
3151        }
3152        if !select.lateral_views.is_empty() {
3153            return Err(Error::InvalidArgumentError(
3154                "LATERAL VIEW is not supported".into(),
3155            ));
3156        }
3157        if select.prewhere.is_some() {
3158            return Err(Error::InvalidArgumentError(
3159                "PREWHERE is not supported".into(),
3160            ));
3161        }
3162        if !group_by_is_empty(&select.group_by) || select.value_table_mode.is_some() {
3163            return Err(Error::InvalidArgumentError(
3164                "GROUP BY and SELECT AS VALUE/STRUCT are not supported".into(),
3165            ));
3166        }
3167        if !select.cluster_by.is_empty()
3168            || !select.distribute_by.is_empty()
3169            || !select.sort_by.is_empty()
3170        {
3171            return Err(Error::InvalidArgumentError(
3172                "CLUSTER/DISTRIBUTE/SORT BY clauses are not supported".into(),
3173            ));
3174        }
3175        if select.having.is_some()
3176            || !select.named_window.is_empty()
3177            || select.qualify.is_some()
3178            || select.connect_by.is_some()
3179        {
3180            return Err(Error::InvalidArgumentError(
3181                "advanced SELECT clauses are not supported".into(),
3182            ));
3183        }
3184
3185        let table_alias = select
3186            .from
3187            .first()
3188            .and_then(|table_with_joins| match &table_with_joins.relation {
3189                TableFactor::Table { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
3190                _ => None,
3191            });
3192
3193        let has_joins = select
3194            .from
3195            .iter()
3196            .any(|table_with_joins| !table_with_joins.joins.is_empty());
3197        // Handle different FROM clause scenarios
3198        let catalog = self.engine.context().table_catalog();
3199        let (mut plan, id_context) = if select.from.is_empty() {
3200            // No FROM clause - use empty string for table context (e.g., SELECT 42, SELECT {'a': 1} AS x)
3201            let mut p = SelectPlan::new("");
3202            let projections = self.build_projection_list(
3203                resolver,
3204                IdentifierContext::new(None),
3205                &select.projection,
3206            )?;
3207            p = p.with_projections(projections);
3208            (p, IdentifierContext::new(None))
3209        } else if select.from.len() == 1 && !has_joins {
3210            // Single table query
3211            let (display_name, canonical_name) = extract_single_table(&select.from)?;
3212            let table_id = catalog.table_id(&canonical_name);
3213            let mut p = SelectPlan::new(display_name.clone());
3214            let single_table_context =
3215                IdentifierContext::new(table_id).with_table_alias(table_alias.clone());
3216            if let Some(alias) = table_alias.as_ref() {
3217                validate_projection_alias_qualifiers(&select.projection, alias)?;
3218            }
3219            if let Some(aggregates) = self.detect_simple_aggregates(&select.projection)? {
3220                p = p.with_aggregates(aggregates);
3221            } else {
3222                let projections = self.build_projection_list(
3223                    resolver,
3224                    single_table_context.clone(),
3225                    &select.projection,
3226                )?;
3227                p = p.with_projections(projections);
3228            }
3229            (p, single_table_context)
3230        } else {
3231            // Multiple tables or explicit joins - treat as cross product for now
3232            let tables = extract_tables(&select.from)?;
3233            let mut p = SelectPlan::with_tables(tables);
3234            // For multi-table queries, we'll build projections differently
3235            // For now, just handle simple column references
3236            let projections = self.build_projection_list(
3237                resolver,
3238                IdentifierContext::new(None),
3239                &select.projection,
3240            )?;
3241            p = p.with_projections(projections);
3242            (p, IdentifierContext::new(None))
3243        };
3244
3245        let filter_expr = match &select.selection {
3246            Some(expr) => {
3247                let materialized_expr = self.materialize_in_subquery(expr.clone())?;
3248                Some(translate_condition_with_context(
3249                    resolver,
3250                    id_context.clone(),
3251                    &materialized_expr,
3252                )?)
3253            }
3254            None => None,
3255        };
3256        plan = plan.with_filter(filter_expr);
3257        plan = plan.with_distinct(distinct);
3258        Ok((plan, id_context))
3259    }
3260
3261    fn translate_order_by(
3262        &self,
3263        resolver: &IdentifierResolver<'_>,
3264        id_context: IdentifierContext,
3265        order_by: &OrderBy,
3266    ) -> SqlResult<Vec<OrderByPlan>> {
3267        let exprs = match &order_by.kind {
3268            OrderByKind::Expressions(exprs) => exprs,
3269            _ => {
3270                return Err(Error::InvalidArgumentError(
3271                    "unsupported ORDER BY clause".into(),
3272                ));
3273            }
3274        };
3275
3276        let base_nulls_first = self.default_nulls_first.load(AtomicOrdering::Relaxed);
3277
3278        let mut plans = Vec::with_capacity(exprs.len());
3279        for order_expr in exprs {
3280            let ascending = order_expr.options.asc.unwrap_or(true);
3281            let default_nulls_first_for_direction = if ascending {
3282                base_nulls_first
3283            } else {
3284                !base_nulls_first
3285            };
3286            let nulls_first = order_expr
3287                .options
3288                .nulls_first
3289                .unwrap_or(default_nulls_first_for_direction);
3290
3291            if let SqlExpr::Identifier(ident) = &order_expr.expr
3292                && ident.value.eq_ignore_ascii_case("ALL")
3293                && ident.quote_style.is_none()
3294            {
3295                plans.push(OrderByPlan {
3296                    target: OrderTarget::All,
3297                    sort_type: OrderSortType::Native,
3298                    ascending,
3299                    nulls_first,
3300                });
3301                continue;
3302            }
3303
3304            let (target, sort_type) = match &order_expr.expr {
3305                SqlExpr::Identifier(_) | SqlExpr::CompoundIdentifier(_) => (
3306                    OrderTarget::Column(Self::resolve_simple_column_expr(
3307                        resolver,
3308                        id_context.clone(),
3309                        &order_expr.expr,
3310                    )?),
3311                    OrderSortType::Native,
3312                ),
3313                SqlExpr::Cast {
3314                    expr,
3315                    data_type:
3316                        SqlDataType::Int(_)
3317                        | SqlDataType::Integer(_)
3318                        | SqlDataType::BigInt(_)
3319                        | SqlDataType::SmallInt(_)
3320                        | SqlDataType::TinyInt(_),
3321                    ..
3322                } => (
3323                    OrderTarget::Column(Self::resolve_simple_column_expr(
3324                        resolver,
3325                        id_context.clone(),
3326                        expr,
3327                    )?),
3328                    OrderSortType::CastTextToInteger,
3329                ),
3330                SqlExpr::Cast { data_type, .. } => {
3331                    return Err(Error::InvalidArgumentError(format!(
3332                        "ORDER BY CAST target type {:?} is not supported",
3333                        data_type
3334                    )));
3335                }
3336                SqlExpr::Value(value_with_span) => match &value_with_span.value {
3337                    Value::Number(raw, _) => {
3338                        let position: usize = raw.parse().map_err(|_| {
3339                            Error::InvalidArgumentError(format!(
3340                                "ORDER BY position '{}' is not a valid positive integer",
3341                                raw
3342                            ))
3343                        })?;
3344                        if position == 0 {
3345                            return Err(Error::InvalidArgumentError(
3346                                "ORDER BY position must be at least 1".into(),
3347                            ));
3348                        }
3349                        (OrderTarget::Index(position - 1), OrderSortType::Native)
3350                    }
3351                    other => {
3352                        return Err(Error::InvalidArgumentError(format!(
3353                            "unsupported ORDER BY literal expression: {other:?}"
3354                        )));
3355                    }
3356                },
3357                other => {
3358                    return Err(Error::InvalidArgumentError(format!(
3359                        "unsupported ORDER BY expression: {other:?}"
3360                    )));
3361                }
3362            };
3363
3364            plans.push(OrderByPlan {
3365                target,
3366                sort_type,
3367                ascending,
3368                nulls_first,
3369            });
3370        }
3371
3372        Ok(plans)
3373    }
3374
3375    fn resolve_simple_column_expr(
3376        resolver: &IdentifierResolver<'_>,
3377        context: IdentifierContext,
3378        expr: &SqlExpr,
3379    ) -> SqlResult<String> {
3380        let scalar = translate_scalar_with_context(resolver, context, expr)?;
3381        match scalar {
3382            llkv_expr::expr::ScalarExpr::Column(column) => Ok(column),
3383            other => Err(Error::InvalidArgumentError(format!(
3384                "ORDER BY expression must reference a simple column, found {other:?}"
3385            ))),
3386        }
3387    }
3388
3389    fn detect_simple_aggregates(
3390        &self,
3391        projection_items: &[SelectItem],
3392    ) -> SqlResult<Option<Vec<AggregateExpr>>> {
3393        if projection_items.is_empty() {
3394            return Ok(None);
3395        }
3396
3397        let mut specs: Vec<AggregateExpr> = Vec::with_capacity(projection_items.len());
3398        for (idx, item) in projection_items.iter().enumerate() {
3399            let (expr, alias_opt) = match item {
3400                SelectItem::UnnamedExpr(expr) => (expr, None),
3401                SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())),
3402                _ => return Ok(None),
3403            };
3404
3405            let alias = alias_opt.unwrap_or_else(|| format!("col{}", idx + 1));
3406            let SqlExpr::Function(func) = expr else {
3407                return Ok(None);
3408            };
3409
3410            if func.uses_odbc_syntax {
3411                return Err(Error::InvalidArgumentError(
3412                    "ODBC function syntax is not supported in aggregate queries".into(),
3413                ));
3414            }
3415            if !matches!(func.parameters, FunctionArguments::None) {
3416                return Err(Error::InvalidArgumentError(
3417                    "parameterized aggregate functions are not supported".into(),
3418                ));
3419            }
3420            if func.filter.is_some()
3421                || func.null_treatment.is_some()
3422                || func.over.is_some()
3423                || !func.within_group.is_empty()
3424            {
3425                return Err(Error::InvalidArgumentError(
3426                    "advanced aggregate clauses are not supported".into(),
3427                ));
3428            }
3429
3430            let mut is_distinct = false;
3431            let args_slice: &[FunctionArg] = match &func.args {
3432                FunctionArguments::List(list) => {
3433                    if let Some(dup) = &list.duplicate_treatment {
3434                        use sqlparser::ast::DuplicateTreatment;
3435                        match dup {
3436                            DuplicateTreatment::All => {}
3437                            DuplicateTreatment::Distinct => is_distinct = true,
3438                        }
3439                    }
3440                    if !list.clauses.is_empty() {
3441                        return Err(Error::InvalidArgumentError(
3442                            "aggregate argument clauses are not supported".into(),
3443                        ));
3444                    }
3445                    &list.args
3446                }
3447                FunctionArguments::None => &[],
3448                FunctionArguments::Subquery(_) => {
3449                    return Err(Error::InvalidArgumentError(
3450                        "aggregate subquery arguments are not supported".into(),
3451                    ));
3452                }
3453            };
3454
3455            let func_name = if func.name.0.len() == 1 {
3456                match &func.name.0[0] {
3457                    ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
3458                    _ => {
3459                        return Err(Error::InvalidArgumentError(
3460                            "unsupported aggregate function name".into(),
3461                        ));
3462                    }
3463                }
3464            } else {
3465                return Err(Error::InvalidArgumentError(
3466                    "qualified aggregate function names are not supported".into(),
3467                ));
3468            };
3469
3470            let aggregate = match func_name.as_str() {
3471                "count" => {
3472                    if args_slice.len() != 1 {
3473                        return Err(Error::InvalidArgumentError(
3474                            "COUNT accepts exactly one argument".into(),
3475                        ));
3476                    }
3477                    match &args_slice[0] {
3478                        FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
3479                            if is_distinct {
3480                                return Err(Error::InvalidArgumentError(
3481                                    "COUNT(DISTINCT *) is not supported".into(),
3482                                ));
3483                            }
3484                            AggregateExpr::count_star(alias)
3485                        }
3486                        FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
3487                            let column = resolve_column_name(arg_expr)?;
3488                            if is_distinct {
3489                                AggregateExpr::count_distinct_column(column, alias)
3490                            } else {
3491                                AggregateExpr::count_column(column, alias)
3492                            }
3493                        }
3494                        FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
3495                            return Err(Error::InvalidArgumentError(
3496                                "named COUNT arguments are not supported".into(),
3497                            ));
3498                        }
3499                        FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
3500                            return Err(Error::InvalidArgumentError(
3501                                "COUNT does not support qualified wildcards".into(),
3502                            ));
3503                        }
3504                    }
3505                }
3506                "sum" | "min" | "max" => {
3507                    if is_distinct {
3508                        return Err(Error::InvalidArgumentError(
3509                            "DISTINCT is not supported for this aggregate".into(),
3510                        ));
3511                    }
3512                    if args_slice.len() != 1 {
3513                        return Err(Error::InvalidArgumentError(format!(
3514                            "{} accepts exactly one argument",
3515                            func_name.to_uppercase()
3516                        )));
3517                    }
3518                    let arg_expr = match &args_slice[0] {
3519                        FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => arg_expr,
3520                        FunctionArg::Unnamed(FunctionArgExpr::Wildcard)
3521                        | FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
3522                            return Err(Error::InvalidArgumentError(format!(
3523                                "{} does not support wildcard arguments",
3524                                func_name.to_uppercase()
3525                            )));
3526                        }
3527                        FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
3528                            return Err(Error::InvalidArgumentError(format!(
3529                                "{} arguments must be column references",
3530                                func_name.to_uppercase()
3531                            )));
3532                        }
3533                    };
3534
3535                    if func_name == "sum" {
3536                        if let Some(column) = parse_count_nulls_case(arg_expr)? {
3537                            AggregateExpr::count_nulls(column, alias)
3538                        } else {
3539                            let column = resolve_column_name(arg_expr)?;
3540                            AggregateExpr::sum_int64(column, alias)
3541                        }
3542                    } else {
3543                        let column = resolve_column_name(arg_expr)?;
3544                        if func_name == "min" {
3545                            AggregateExpr::min_int64(column, alias)
3546                        } else {
3547                            AggregateExpr::max_int64(column, alias)
3548                        }
3549                    }
3550                }
3551                _ => return Ok(None),
3552            };
3553
3554            specs.push(aggregate);
3555        }
3556
3557        if specs.is_empty() {
3558            return Ok(None);
3559        }
3560        Ok(Some(specs))
3561    }
3562
3563    fn build_projection_list(
3564        &self,
3565        resolver: &IdentifierResolver<'_>,
3566        id_context: IdentifierContext,
3567        projection_items: &[SelectItem],
3568    ) -> SqlResult<Vec<SelectProjection>> {
3569        if projection_items.is_empty() {
3570            return Err(Error::InvalidArgumentError(
3571                "SELECT projection must include at least one column".into(),
3572            ));
3573        }
3574
3575        let mut projections = Vec::with_capacity(projection_items.len());
3576        for (idx, item) in projection_items.iter().enumerate() {
3577            match item {
3578                SelectItem::Wildcard(options) => {
3579                    if let Some(exclude) = &options.opt_exclude {
3580                        use sqlparser::ast::ExcludeSelectItem;
3581                        let exclude_cols = match exclude {
3582                            ExcludeSelectItem::Single(ident) => vec![ident.value.clone()],
3583                            ExcludeSelectItem::Multiple(idents) => {
3584                                idents.iter().map(|id| id.value.clone()).collect()
3585                            }
3586                        };
3587                        projections.push(SelectProjection::AllColumnsExcept {
3588                            exclude: exclude_cols,
3589                        });
3590                    } else {
3591                        projections.push(SelectProjection::AllColumns);
3592                    }
3593                }
3594                SelectItem::QualifiedWildcard(kind, _) => match kind {
3595                    SelectItemQualifiedWildcardKind::ObjectName(name) => {
3596                        projections.push(SelectProjection::Column {
3597                            name: name.to_string(),
3598                            alias: None,
3599                        });
3600                    }
3601                    SelectItemQualifiedWildcardKind::Expr(_) => {
3602                        return Err(Error::InvalidArgumentError(
3603                            "expression-qualified wildcards are not supported".into(),
3604                        ));
3605                    }
3606                },
3607                SelectItem::UnnamedExpr(expr) => match expr {
3608                    SqlExpr::Identifier(ident) => {
3609                        let parts = vec![ident.value.clone()];
3610                        let resolution = resolver.resolve(&parts, id_context.clone())?;
3611                        if resolution.is_simple() {
3612                            projections.push(SelectProjection::Column {
3613                                name: resolution.column().to_string(),
3614                                alias: None,
3615                            });
3616                        } else {
3617                            let alias = format!("col{}", idx + 1);
3618                            projections.push(SelectProjection::Computed {
3619                                expr: resolution.into_scalar_expr(),
3620                                alias,
3621                            });
3622                        }
3623                    }
3624                    SqlExpr::CompoundIdentifier(parts) => {
3625                        let name_parts: Vec<String> =
3626                            parts.iter().map(|part| part.value.clone()).collect();
3627                        let resolution = resolver.resolve(&name_parts, id_context.clone())?;
3628                        if resolution.is_simple() {
3629                            projections.push(SelectProjection::Column {
3630                                name: resolution.column().to_string(),
3631                                alias: None,
3632                            });
3633                        } else {
3634                            let alias = format!("col{}", idx + 1);
3635                            projections.push(SelectProjection::Computed {
3636                                expr: resolution.into_scalar_expr(),
3637                                alias,
3638                            });
3639                        }
3640                    }
3641                    _ => {
3642                        let alias = format!("col{}", idx + 1);
3643                        let scalar =
3644                            translate_scalar_with_context(resolver, id_context.clone(), expr)?;
3645                        projections.push(SelectProjection::Computed {
3646                            expr: scalar,
3647                            alias,
3648                        });
3649                    }
3650                },
3651                SelectItem::ExprWithAlias { expr, alias } => match expr {
3652                    SqlExpr::Identifier(ident) => {
3653                        let parts = vec![ident.value.clone()];
3654                        let resolution = resolver.resolve(&parts, id_context.clone())?;
3655                        if resolution.is_simple() {
3656                            projections.push(SelectProjection::Column {
3657                                name: resolution.column().to_string(),
3658                                alias: Some(alias.value.clone()),
3659                            });
3660                        } else {
3661                            projections.push(SelectProjection::Computed {
3662                                expr: resolution.into_scalar_expr(),
3663                                alias: alias.value.clone(),
3664                            });
3665                        }
3666                    }
3667                    SqlExpr::CompoundIdentifier(parts) => {
3668                        let name_parts: Vec<String> =
3669                            parts.iter().map(|part| part.value.clone()).collect();
3670                        let resolution = resolver.resolve(&name_parts, id_context.clone())?;
3671                        if resolution.is_simple() {
3672                            projections.push(SelectProjection::Column {
3673                                name: resolution.column().to_string(),
3674                                alias: Some(alias.value.clone()),
3675                            });
3676                        } else {
3677                            projections.push(SelectProjection::Computed {
3678                                expr: resolution.into_scalar_expr(),
3679                                alias: alias.value.clone(),
3680                            });
3681                        }
3682                    }
3683                    _ => {
3684                        let scalar =
3685                            translate_scalar_with_context(resolver, id_context.clone(), expr)?;
3686                        projections.push(SelectProjection::Computed {
3687                            expr: scalar,
3688                            alias: alias.value.clone(),
3689                        });
3690                    }
3691                },
3692            }
3693        }
3694        Ok(projections)
3695    }
3696
3697    #[allow(clippy::too_many_arguments)] // NOTE: Keeps parity with SQL START TRANSACTION grammar; revisit when options expand.
3698    fn handle_start_transaction(
3699        &self,
3700        modes: Vec<TransactionMode>,
3701        begin: bool,
3702        transaction: Option<BeginTransactionKind>,
3703        modifier: Option<TransactionModifier>,
3704        statements: Vec<Statement>,
3705        exception: Option<Vec<ExceptionWhen>>,
3706        has_end_keyword: bool,
3707    ) -> SqlResult<RuntimeStatementResult<P>> {
3708        if !modes.is_empty() {
3709            return Err(Error::InvalidArgumentError(
3710                "transaction modes are not supported".into(),
3711            ));
3712        }
3713        if modifier.is_some() {
3714            return Err(Error::InvalidArgumentError(
3715                "transaction modifiers are not supported".into(),
3716            ));
3717        }
3718        if !statements.is_empty() || exception.is_some() || has_end_keyword {
3719            return Err(Error::InvalidArgumentError(
3720                "BEGIN blocks with inline statements or exceptions are not supported".into(),
3721            ));
3722        }
3723        if let Some(kind) = transaction {
3724            match kind {
3725                BeginTransactionKind::Transaction | BeginTransactionKind::Work => {}
3726            }
3727        }
3728        if !begin {
3729            // Currently treat START TRANSACTION same as BEGIN
3730            tracing::warn!("Currently treat `START TRANSACTION` same as `BEGIN`")
3731        }
3732
3733        self.execute_plan_statement(PlanStatement::BeginTransaction)
3734    }
3735
3736    fn handle_commit(
3737        &self,
3738        chain: bool,
3739        end: bool,
3740        modifier: Option<TransactionModifier>,
3741    ) -> SqlResult<RuntimeStatementResult<P>> {
3742        if chain {
3743            return Err(Error::InvalidArgumentError(
3744                "COMMIT AND [NO] CHAIN is not supported".into(),
3745            ));
3746        }
3747        if end {
3748            return Err(Error::InvalidArgumentError(
3749                "END blocks are not supported".into(),
3750            ));
3751        }
3752        if modifier.is_some() {
3753            return Err(Error::InvalidArgumentError(
3754                "transaction modifiers are not supported".into(),
3755            ));
3756        }
3757
3758        self.execute_plan_statement(PlanStatement::CommitTransaction)
3759    }
3760
3761    fn handle_rollback(
3762        &self,
3763        chain: bool,
3764        savepoint: Option<Ident>,
3765    ) -> SqlResult<RuntimeStatementResult<P>> {
3766        if chain {
3767            return Err(Error::InvalidArgumentError(
3768                "ROLLBACK AND [NO] CHAIN is not supported".into(),
3769            ));
3770        }
3771        if savepoint.is_some() {
3772            return Err(Error::InvalidArgumentError(
3773                "ROLLBACK TO SAVEPOINT is not supported".into(),
3774            ));
3775        }
3776
3777        self.execute_plan_statement(PlanStatement::RollbackTransaction)
3778    }
3779
3780    fn handle_set(&self, set_stmt: Set) -> SqlResult<RuntimeStatementResult<P>> {
3781        match set_stmt {
3782            Set::SingleAssignment {
3783                scope,
3784                hivevar,
3785                variable,
3786                values,
3787            } => {
3788                if scope.is_some() || hivevar {
3789                    return Err(Error::InvalidArgumentError(
3790                        "SET modifiers are not supported".into(),
3791                    ));
3792                }
3793
3794                let variable_name_raw = variable.to_string();
3795                let variable_name = variable_name_raw.to_ascii_lowercase();
3796
3797                match variable_name.as_str() {
3798                    "default_null_order" => {
3799                        if values.len() != 1 {
3800                            return Err(Error::InvalidArgumentError(
3801                                "SET default_null_order expects exactly one value".into(),
3802                            ));
3803                        }
3804
3805                        let value_expr = &values[0];
3806                        let normalized = match value_expr {
3807                            SqlExpr::Value(value_with_span) => value_with_span
3808                                .value
3809                                .clone()
3810                                .into_string()
3811                                .map(|s| s.to_ascii_lowercase()),
3812                            SqlExpr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
3813                            _ => None,
3814                        };
3815
3816                        if !matches!(normalized.as_deref(), Some("nulls_first" | "nulls_last")) {
3817                            return Err(Error::InvalidArgumentError(format!(
3818                                "unsupported value for SET default_null_order: {value_expr:?}"
3819                            )));
3820                        }
3821
3822                        let use_nulls_first = matches!(normalized.as_deref(), Some("nulls_first"));
3823                        self.default_nulls_first
3824                            .store(use_nulls_first, AtomicOrdering::Relaxed);
3825
3826                        Ok(RuntimeStatementResult::NoOp)
3827                    }
3828                    "immediate_transaction_mode" => {
3829                        if values.len() != 1 {
3830                            return Err(Error::InvalidArgumentError(
3831                                "SET immediate_transaction_mode expects exactly one value".into(),
3832                            ));
3833                        }
3834                        let normalized = values[0].to_string().to_ascii_lowercase();
3835                        let enabled = match normalized.as_str() {
3836                            "true" | "on" | "1" => true,
3837                            "false" | "off" | "0" => false,
3838                            _ => {
3839                                return Err(Error::InvalidArgumentError(format!(
3840                                    "unsupported value for SET immediate_transaction_mode: {}",
3841                                    values[0]
3842                                )));
3843                            }
3844                        };
3845                        if !enabled {
3846                            tracing::warn!(
3847                                "SET immediate_transaction_mode=false has no effect; continuing with auto mode"
3848                            );
3849                        }
3850                        Ok(RuntimeStatementResult::NoOp)
3851                    }
3852                    _ => Err(Error::InvalidArgumentError(format!(
3853                        "unsupported SET variable: {variable_name_raw}"
3854                    ))),
3855                }
3856            }
3857            other => Err(Error::InvalidArgumentError(format!(
3858                "unsupported SQL SET statement: {other:?}",
3859            ))),
3860        }
3861    }
3862
3863    fn handle_pragma(
3864        &self,
3865        name: ObjectName,
3866        value: Option<Value>,
3867        is_eq: bool,
3868    ) -> SqlResult<RuntimeStatementResult<P>> {
3869        let (display, canonical) = canonical_object_name(&name)?;
3870        if value.is_some() || is_eq {
3871            return Err(Error::InvalidArgumentError(format!(
3872                "PRAGMA '{display}' does not accept a value"
3873            )));
3874        }
3875
3876        match canonical.as_str() {
3877            "enable_verification" | "disable_verification" => Ok(RuntimeStatementResult::NoOp),
3878            _ => Err(Error::InvalidArgumentError(format!(
3879                "unsupported PRAGMA '{}'",
3880                display
3881            ))),
3882        }
3883    }
3884}
3885
3886fn canonical_object_name(name: &ObjectName) -> SqlResult<(String, String)> {
3887    if name.0.is_empty() {
3888        return Err(Error::InvalidArgumentError(
3889            "object name must not be empty".into(),
3890        ));
3891    }
3892    let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
3893    for part in &name.0 {
3894        let ident = match part {
3895            ObjectNamePart::Identifier(ident) => ident,
3896            _ => {
3897                return Err(Error::InvalidArgumentError(
3898                    "object names using functions are not supported".into(),
3899                ));
3900            }
3901        };
3902        parts.push(ident.value.clone());
3903    }
3904    let display = parts.join(".");
3905    let canonical = display.to_ascii_lowercase();
3906    Ok((display, canonical))
3907}
3908
3909/// Parse an object name into optional schema and table name components.
3910///
3911/// Returns (schema_name, table_name) where schema_name is None if not qualified.
3912///
3913/// Examples:
3914/// - "users" -> (None, "users")
3915/// - "test.users" -> (Some("test"), "users")
3916/// - "catalog.test.users" -> Error (too many parts)
3917fn parse_schema_qualified_name(name: &ObjectName) -> SqlResult<(Option<String>, String)> {
3918    if name.0.is_empty() {
3919        return Err(Error::InvalidArgumentError(
3920            "object name must not be empty".into(),
3921        ));
3922    }
3923
3924    let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
3925    for part in &name.0 {
3926        let ident = match part {
3927            ObjectNamePart::Identifier(ident) => ident,
3928            _ => {
3929                return Err(Error::InvalidArgumentError(
3930                    "object names using functions are not supported".into(),
3931                ));
3932            }
3933        };
3934        parts.push(ident.value.clone());
3935    }
3936
3937    match parts.len() {
3938        1 => Ok((None, parts[0].clone())),
3939        2 => Ok((Some(parts[0].clone()), parts[1].clone())),
3940        _ => Err(Error::InvalidArgumentError(format!(
3941            "table name has too many parts: {}",
3942            name
3943        ))),
3944    }
3945}
3946
3947/// Extract column name from an index column specification (OrderBy expression).
3948///
3949/// This handles the common pattern of validating and extracting column names from
3950/// SQL index column definitions (PRIMARY KEY, UNIQUE, CREATE INDEX).
3951///
3952/// # Parameters
3953/// - `index_col`: The index column from sqlparser AST
3954/// - `context`: Description of where this column appears (e.g., "PRIMARY KEY", "UNIQUE constraint")
3955/// - `allow_sort_options`: If false, errors if any sort options are present; if true, validates them
3956/// - `allow_compound`: If true, allows compound identifiers and takes the last part; if false, only allows simple identifiers
3957///
3958/// # Returns
3959/// Column name as a String
3960fn extract_index_column_name(
3961    index_col: &sqlparser::ast::IndexColumn,
3962    context: &str,
3963    allow_sort_options: bool,
3964    allow_compound: bool,
3965) -> SqlResult<String> {
3966    use sqlparser::ast::Expr as SqlExpr;
3967
3968    // Check operator class
3969    if index_col.operator_class.is_some() {
3970        return Err(Error::InvalidArgumentError(format!(
3971            "{} operator classes are not supported",
3972            context
3973        )));
3974    }
3975
3976    let order_expr = &index_col.column;
3977
3978    // Validate sort options
3979    if allow_sort_options {
3980        // For CREATE INDEX: extract and validate sort options
3981        let _ascending = order_expr.options.asc.unwrap_or(true);
3982        let _nulls_first = order_expr.options.nulls_first.unwrap_or(false);
3983        // DESC and NULLS FIRST are now supported
3984    } else {
3985        // For constraints: no sort options allowed
3986        if order_expr.options.asc.is_some()
3987            || order_expr.options.nulls_first.is_some()
3988            || order_expr.with_fill.is_some()
3989        {
3990            return Err(Error::InvalidArgumentError(format!(
3991                "{} columns must be simple identifiers",
3992                context
3993            )));
3994        }
3995    }
3996
3997    // Extract column name from expression
3998    let column_name = match &order_expr.expr {
3999        SqlExpr::Identifier(ident) => ident.value.clone(),
4000        SqlExpr::CompoundIdentifier(parts) => {
4001            if allow_compound {
4002                // For CREATE INDEX: allow qualified names, take last part
4003                parts
4004                    .last()
4005                    .map(|ident| ident.value.clone())
4006                    .ok_or_else(|| {
4007                        Error::InvalidArgumentError(format!(
4008                            "invalid column reference in {}",
4009                            context
4010                        ))
4011                    })?
4012            } else if parts.len() == 1 {
4013                // For constraints: only allow single-part compound identifiers
4014                parts[0].value.clone()
4015            } else {
4016                return Err(Error::InvalidArgumentError(format!(
4017                    "{} columns must be column identifiers",
4018                    context
4019                )));
4020            }
4021        }
4022        other => {
4023            return Err(Error::InvalidArgumentError(format!(
4024                "{} only supports column references, found {:?}",
4025                context, other
4026            )));
4027        }
4028    };
4029
4030    Ok(column_name)
4031}
4032
4033fn validate_create_table_common(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
4034    if stmt.clone.is_some() || stmt.like.is_some() {
4035        return Err(Error::InvalidArgumentError(
4036            "CREATE TABLE LIKE/CLONE is not supported".into(),
4037        ));
4038    }
4039    if stmt.or_replace && stmt.if_not_exists {
4040        return Err(Error::InvalidArgumentError(
4041            "CREATE TABLE cannot combine OR REPLACE with IF NOT EXISTS".into(),
4042        ));
4043    }
4044    use sqlparser::ast::TableConstraint;
4045
4046    let mut seen_primary_key = false;
4047    for constraint in &stmt.constraints {
4048        match constraint {
4049            TableConstraint::PrimaryKey { .. } => {
4050                if seen_primary_key {
4051                    return Err(Error::InvalidArgumentError(
4052                        "multiple PRIMARY KEY constraints are not supported".into(),
4053                    ));
4054                }
4055                seen_primary_key = true;
4056            }
4057            TableConstraint::Unique { .. } => {
4058                // Detailed validation is performed later during plan construction.
4059            }
4060            TableConstraint::ForeignKey { .. } => {
4061                // Detailed validation is performed later during plan construction.
4062            }
4063            other => {
4064                return Err(Error::InvalidArgumentError(format!(
4065                    "table-level constraint {:?} is not supported",
4066                    other
4067                )));
4068            }
4069        }
4070    }
4071    Ok(())
4072}
4073
4074fn validate_check_constraint(
4075    check_expr: &sqlparser::ast::Expr,
4076    table_name: &str,
4077    column_names: &[&str],
4078) -> SqlResult<()> {
4079    use sqlparser::ast::Expr as SqlExpr;
4080
4081    let column_names_lower: HashSet<String> = column_names
4082        .iter()
4083        .map(|name| name.to_ascii_lowercase())
4084        .collect();
4085
4086    let mut stack: Vec<&SqlExpr> = vec![check_expr];
4087
4088    while let Some(expr) = stack.pop() {
4089        match expr {
4090            SqlExpr::Subquery(_) => {
4091                return Err(Error::InvalidArgumentError(
4092                    "Subqueries are not allowed in CHECK constraints".into(),
4093                ));
4094            }
4095            SqlExpr::Function(func) => {
4096                let func_name = func.name.to_string().to_uppercase();
4097                if matches!(func_name.as_str(), "SUM" | "AVG" | "COUNT" | "MIN" | "MAX") {
4098                    return Err(Error::InvalidArgumentError(
4099                        "Aggregate functions are not allowed in CHECK constraints".into(),
4100                    ));
4101                }
4102
4103                if let sqlparser::ast::FunctionArguments::List(list) = &func.args {
4104                    for arg in &list.args {
4105                        if let sqlparser::ast::FunctionArg::Unnamed(
4106                            sqlparser::ast::FunctionArgExpr::Expr(expr),
4107                        ) = arg
4108                        {
4109                            stack.push(expr);
4110                        }
4111                    }
4112                }
4113            }
4114            SqlExpr::Identifier(ident) => {
4115                if !column_names_lower.contains(&ident.value.to_ascii_lowercase()) {
4116                    return Err(Error::InvalidArgumentError(format!(
4117                        "Column '{}' referenced in CHECK constraint does not exist",
4118                        ident.value
4119                    )));
4120                }
4121            }
4122            SqlExpr::CompoundIdentifier(idents) => {
4123                if idents.len() == 2 {
4124                    let first = idents[0].value.as_str();
4125                    let second = &idents[1].value;
4126
4127                    if column_names_lower.contains(&first.to_ascii_lowercase()) {
4128                        continue;
4129                    }
4130
4131                    if !first.eq_ignore_ascii_case(table_name) {
4132                        return Err(Error::InvalidArgumentError(format!(
4133                            "CHECK constraint references column from different table '{}'",
4134                            first
4135                        )));
4136                    }
4137
4138                    if !column_names_lower.contains(&second.to_ascii_lowercase()) {
4139                        return Err(Error::InvalidArgumentError(format!(
4140                            "Column '{}' referenced in CHECK constraint does not exist",
4141                            second
4142                        )));
4143                    }
4144                } else if idents.len() == 3 {
4145                    let first = &idents[0].value;
4146                    let second = &idents[1].value;
4147                    let third = &idents[2].value;
4148
4149                    if first.eq_ignore_ascii_case(table_name) {
4150                        if !column_names_lower.contains(&second.to_ascii_lowercase()) {
4151                            return Err(Error::InvalidArgumentError(format!(
4152                                "Column '{}' referenced in CHECK constraint does not exist",
4153                                second
4154                            )));
4155                        }
4156                    } else if second.eq_ignore_ascii_case(table_name) {
4157                        if !column_names_lower.contains(&third.to_ascii_lowercase()) {
4158                            return Err(Error::InvalidArgumentError(format!(
4159                                "Column '{}' referenced in CHECK constraint does not exist",
4160                                third
4161                            )));
4162                        }
4163                    } else {
4164                        return Err(Error::InvalidArgumentError(format!(
4165                            "CHECK constraint references column from different table '{}'",
4166                            second
4167                        )));
4168                    }
4169                }
4170            }
4171            SqlExpr::BinaryOp { left, right, .. } => {
4172                stack.push(left);
4173                stack.push(right);
4174            }
4175            SqlExpr::UnaryOp { expr, .. } | SqlExpr::Nested(expr) => {
4176                stack.push(expr);
4177            }
4178            SqlExpr::Value(_) | SqlExpr::TypedString { .. } => {}
4179            _ => {}
4180        }
4181    }
4182
4183    Ok(())
4184}
4185
4186fn validate_create_table_definition(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
4187    for column in &stmt.columns {
4188        for ColumnOptionDef { option, .. } in &column.options {
4189            match option {
4190                ColumnOption::Null
4191                | ColumnOption::NotNull
4192                | ColumnOption::Unique { .. }
4193                | ColumnOption::Check(_)
4194                | ColumnOption::ForeignKey { .. } => {}
4195                ColumnOption::Default(_) => {
4196                    return Err(Error::InvalidArgumentError(format!(
4197                        "DEFAULT values are not supported for column '{}'",
4198                        column.name
4199                    )));
4200                }
4201                other => {
4202                    return Err(Error::InvalidArgumentError(format!(
4203                        "unsupported column option {:?} on '{}'",
4204                        other, column.name
4205                    )));
4206                }
4207            }
4208        }
4209    }
4210    Ok(())
4211}
4212
4213fn validate_create_table_as(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
4214    if !stmt.columns.is_empty() {
4215        return Err(Error::InvalidArgumentError(
4216            "CREATE TABLE AS SELECT does not support column definitions yet".into(),
4217        ));
4218    }
4219    Ok(())
4220}
4221
4222fn validate_simple_query(query: &Query) -> SqlResult<()> {
4223    if query.with.is_some() {
4224        return Err(Error::InvalidArgumentError(
4225            "WITH clauses are not supported".into(),
4226        ));
4227    }
4228    if let Some(limit_clause) = &query.limit_clause {
4229        match limit_clause {
4230            LimitClause::LimitOffset {
4231                offset: Some(_), ..
4232            }
4233            | LimitClause::OffsetCommaLimit { .. } => {
4234                return Err(Error::InvalidArgumentError(
4235                    "OFFSET clauses are not supported".into(),
4236                ));
4237            }
4238            LimitClause::LimitOffset { limit_by, .. } if !limit_by.is_empty() => {
4239                return Err(Error::InvalidArgumentError(
4240                    "LIMIT BY clauses are not supported".into(),
4241                ));
4242            }
4243            _ => {}
4244        }
4245    }
4246    if query.fetch.is_some() {
4247        return Err(Error::InvalidArgumentError(
4248            "FETCH clauses are not supported".into(),
4249        ));
4250    }
4251    Ok(())
4252}
4253
4254fn resolve_column_name(expr: &SqlExpr) -> SqlResult<String> {
4255    match expr {
4256        SqlExpr::Identifier(ident) => Ok(ident.value.clone()),
4257        SqlExpr::CompoundIdentifier(parts) => {
4258            if let Some(last) = parts.last() {
4259                Ok(last.value.clone())
4260            } else {
4261                Err(Error::InvalidArgumentError(
4262                    "empty column identifier".into(),
4263                ))
4264            }
4265        }
4266        _ => Err(Error::InvalidArgumentError(
4267            "aggregate arguments must be plain column identifiers".into(),
4268        )),
4269    }
4270}
4271
4272fn validate_projection_alias_qualifiers(
4273    projection_items: &[SelectItem],
4274    alias: &str,
4275) -> SqlResult<()> {
4276    let alias_lower = alias.to_ascii_lowercase();
4277    for item in projection_items {
4278        match item {
4279            SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
4280                if let SqlExpr::CompoundIdentifier(parts) = expr
4281                    && parts.len() >= 2
4282                    && let Some(first) = parts.first()
4283                    && !first.value.eq_ignore_ascii_case(&alias_lower)
4284                {
4285                    return Err(Error::InvalidArgumentError(format!(
4286                        "Binder Error: table '{}' not found",
4287                        first.value
4288                    )));
4289                }
4290            }
4291            _ => {}
4292        }
4293    }
4294    Ok(())
4295}
4296
4297/// Try to parse a function as an aggregate call for use in scalar expressions
4298/// Check if a scalar expression contains any aggregate functions
4299#[allow(dead_code)] // Utility function for future use
4300fn expr_contains_aggregate(expr: &llkv_expr::expr::ScalarExpr<String>) -> bool {
4301    match expr {
4302        llkv_expr::expr::ScalarExpr::Aggregate(_) => true,
4303        llkv_expr::expr::ScalarExpr::Binary { left, right, .. } => {
4304            expr_contains_aggregate(left) || expr_contains_aggregate(right)
4305        }
4306        llkv_expr::expr::ScalarExpr::GetField { base, .. } => expr_contains_aggregate(base),
4307        llkv_expr::expr::ScalarExpr::Column(_) | llkv_expr::expr::ScalarExpr::Literal(_) => false,
4308    }
4309}
4310
4311fn try_parse_aggregate_function(
4312    func: &sqlparser::ast::Function,
4313) -> SqlResult<Option<llkv_expr::expr::AggregateCall<String>>> {
4314    use sqlparser::ast::{FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart};
4315
4316    if func.uses_odbc_syntax {
4317        return Ok(None);
4318    }
4319    if !matches!(func.parameters, FunctionArguments::None) {
4320        return Ok(None);
4321    }
4322    if func.filter.is_some()
4323        || func.null_treatment.is_some()
4324        || func.over.is_some()
4325        || !func.within_group.is_empty()
4326    {
4327        return Ok(None);
4328    }
4329
4330    let func_name = if func.name.0.len() == 1 {
4331        match &func.name.0[0] {
4332            ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
4333            _ => return Ok(None),
4334        }
4335    } else {
4336        return Ok(None);
4337    };
4338
4339    let args_slice: &[FunctionArg] = match &func.args {
4340        FunctionArguments::List(list) => {
4341            if list.duplicate_treatment.is_some() || !list.clauses.is_empty() {
4342                return Ok(None);
4343            }
4344            &list.args
4345        }
4346        FunctionArguments::None => &[],
4347        FunctionArguments::Subquery(_) => return Ok(None),
4348    };
4349
4350    let agg_call = match func_name.as_str() {
4351        "count" => {
4352            if args_slice.len() != 1 {
4353                return Err(Error::InvalidArgumentError(
4354                    "COUNT accepts exactly one argument".into(),
4355                ));
4356            }
4357            match &args_slice[0] {
4358                FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
4359                    llkv_expr::expr::AggregateCall::CountStar
4360                }
4361                FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
4362                    let column = resolve_column_name(arg_expr)?;
4363                    llkv_expr::expr::AggregateCall::Count(column)
4364                }
4365                _ => {
4366                    return Err(Error::InvalidArgumentError(
4367                        "unsupported COUNT argument".into(),
4368                    ));
4369                }
4370            }
4371        }
4372        "sum" => {
4373            if args_slice.len() != 1 {
4374                return Err(Error::InvalidArgumentError(
4375                    "SUM accepts exactly one argument".into(),
4376                ));
4377            }
4378            let arg_expr = match &args_slice[0] {
4379                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4380                _ => {
4381                    return Err(Error::InvalidArgumentError(
4382                        "SUM requires a column argument".into(),
4383                    ));
4384                }
4385            };
4386
4387            // Check for COUNT(CASE ...) pattern
4388            if let Some(column) = parse_count_nulls_case(arg_expr)? {
4389                llkv_expr::expr::AggregateCall::CountNulls(column)
4390            } else {
4391                let column = resolve_column_name(arg_expr)?;
4392                llkv_expr::expr::AggregateCall::Sum(column)
4393            }
4394        }
4395        "min" => {
4396            if args_slice.len() != 1 {
4397                return Err(Error::InvalidArgumentError(
4398                    "MIN accepts exactly one argument".into(),
4399                ));
4400            }
4401            let arg_expr = match &args_slice[0] {
4402                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4403                _ => {
4404                    return Err(Error::InvalidArgumentError(
4405                        "MIN requires a column argument".into(),
4406                    ));
4407                }
4408            };
4409            let column = resolve_column_name(arg_expr)?;
4410            llkv_expr::expr::AggregateCall::Min(column)
4411        }
4412        "max" => {
4413            if args_slice.len() != 1 {
4414                return Err(Error::InvalidArgumentError(
4415                    "MAX accepts exactly one argument".into(),
4416                ));
4417            }
4418            let arg_expr = match &args_slice[0] {
4419                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4420                _ => {
4421                    return Err(Error::InvalidArgumentError(
4422                        "MAX requires a column argument".into(),
4423                    ));
4424                }
4425            };
4426            let column = resolve_column_name(arg_expr)?;
4427            llkv_expr::expr::AggregateCall::Max(column)
4428        }
4429        _ => return Ok(None),
4430    };
4431
4432    Ok(Some(agg_call))
4433}
4434
4435fn parse_count_nulls_case(expr: &SqlExpr) -> SqlResult<Option<String>> {
4436    let SqlExpr::Case {
4437        operand,
4438        conditions,
4439        else_result,
4440        ..
4441    } = expr
4442    else {
4443        return Ok(None);
4444    };
4445
4446    if operand.is_some() || conditions.len() != 1 {
4447        return Ok(None);
4448    }
4449
4450    let case_when = &conditions[0];
4451    if !is_integer_literal(&case_when.result, 1) {
4452        return Ok(None);
4453    }
4454
4455    let else_expr = match else_result {
4456        Some(expr) => expr.as_ref(),
4457        None => return Ok(None),
4458    };
4459    if !is_integer_literal(else_expr, 0) {
4460        return Ok(None);
4461    }
4462
4463    let inner = match &case_when.condition {
4464        SqlExpr::IsNull(inner) => inner.as_ref(),
4465        _ => return Ok(None),
4466    };
4467
4468    resolve_column_name(inner).map(Some)
4469}
4470
4471fn is_integer_literal(expr: &SqlExpr, expected: i64) -> bool {
4472    match expr {
4473        SqlExpr::Value(ValueWithSpan {
4474            value: Value::Number(text, _),
4475            ..
4476        }) => text.parse::<i64>() == Ok(expected),
4477        _ => false,
4478    }
4479}
4480
4481fn translate_condition_with_context(
4482    resolver: &IdentifierResolver<'_>,
4483    context: IdentifierContext,
4484    expr: &SqlExpr,
4485) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
4486    // Iterative postorder traversal using the TransformFrame pattern.
4487    // See llkv-plan::TransformFrame documentation for pattern details.
4488    //
4489    // This avoids stack overflow on deeply nested expressions (50k+ nodes) by using
4490    // explicit work_stack and result_stack instead of recursion.
4491
4492    enum ConditionExitContext {
4493        And,
4494        Or,
4495        Not,
4496        Nested,
4497    }
4498
4499    type ConditionFrame<'a> = llkv_plan::TransformFrame<
4500        'a,
4501        SqlExpr,
4502        llkv_expr::expr::Expr<'static, String>,
4503        ConditionExitContext,
4504    >;
4505
4506    let mut work_stack: Vec<ConditionFrame> = vec![ConditionFrame::Enter(expr)];
4507    let mut result_stack: Vec<llkv_expr::expr::Expr<'static, String>> = Vec::new();
4508
4509    while let Some(frame) = work_stack.pop() {
4510        match frame {
4511            ConditionFrame::Enter(node) => match node {
4512                SqlExpr::BinaryOp { left, op, right } => match op {
4513                    BinaryOperator::And => {
4514                        work_stack.push(ConditionFrame::Exit(ConditionExitContext::And));
4515                        work_stack.push(ConditionFrame::Enter(right));
4516                        work_stack.push(ConditionFrame::Enter(left));
4517                    }
4518                    BinaryOperator::Or => {
4519                        work_stack.push(ConditionFrame::Exit(ConditionExitContext::Or));
4520                        work_stack.push(ConditionFrame::Enter(right));
4521                        work_stack.push(ConditionFrame::Enter(left));
4522                    }
4523                    BinaryOperator::Eq
4524                    | BinaryOperator::NotEq
4525                    | BinaryOperator::Lt
4526                    | BinaryOperator::LtEq
4527                    | BinaryOperator::Gt
4528                    | BinaryOperator::GtEq => {
4529                        let result = translate_comparison_with_context(
4530                            resolver,
4531                            context.clone(),
4532                            left,
4533                            op.clone(),
4534                            right,
4535                        )?;
4536                        work_stack.push(ConditionFrame::Leaf(result));
4537                    }
4538                    other => {
4539                        return Err(Error::InvalidArgumentError(format!(
4540                            "unsupported binary operator in WHERE clause: {other:?}"
4541                        )));
4542                    }
4543                },
4544                SqlExpr::UnaryOp {
4545                    op: UnaryOperator::Not,
4546                    expr: inner,
4547                } => {
4548                    work_stack.push(ConditionFrame::Exit(ConditionExitContext::Not));
4549                    work_stack.push(ConditionFrame::Enter(inner));
4550                }
4551                SqlExpr::Nested(inner) => {
4552                    work_stack.push(ConditionFrame::Exit(ConditionExitContext::Nested));
4553                    work_stack.push(ConditionFrame::Enter(inner));
4554                }
4555                SqlExpr::IsNull(inner) => {
4556                    let scalar = translate_scalar_with_context(resolver, context.clone(), inner)?;
4557                    match scalar {
4558                        llkv_expr::expr::ScalarExpr::Column(column) => {
4559                            work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::Pred(
4560                                llkv_expr::expr::Filter {
4561                                    field_id: column,
4562                                    op: llkv_expr::expr::Operator::IsNull,
4563                                },
4564                            )));
4565                        }
4566                        _ => {
4567                            return Err(Error::InvalidArgumentError(
4568                                "IS NULL predicates currently support column references only"
4569                                    .into(),
4570                            ));
4571                        }
4572                    }
4573                }
4574                SqlExpr::IsNotNull(inner) => {
4575                    let scalar = translate_scalar_with_context(resolver, context.clone(), inner)?;
4576                    match scalar {
4577                        llkv_expr::expr::ScalarExpr::Column(column) => {
4578                            work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::Pred(
4579                                llkv_expr::expr::Filter {
4580                                    field_id: column,
4581                                    op: llkv_expr::expr::Operator::IsNotNull,
4582                                },
4583                            )));
4584                        }
4585                        _ => {
4586                            return Err(Error::InvalidArgumentError(
4587                                "IS NOT NULL predicates currently support column references only"
4588                                    .into(),
4589                            ));
4590                        }
4591                    }
4592                }
4593                SqlExpr::InList {
4594                    expr: in_expr,
4595                    list,
4596                    negated,
4597                } => {
4598                    if list.is_empty() {
4599                        let result = if *negated {
4600                            llkv_expr::expr::Expr::Literal(true)
4601                        } else {
4602                            llkv_expr::expr::Expr::Literal(false)
4603                        };
4604                        work_stack.push(ConditionFrame::Leaf(result));
4605                    } else {
4606                        let target =
4607                            translate_scalar_with_context(resolver, context.clone(), in_expr)?;
4608                        let mut values = Vec::with_capacity(list.len());
4609                        for value_expr in list {
4610                            let scalar = translate_scalar_with_context(
4611                                resolver,
4612                                context.clone(),
4613                                value_expr,
4614                            )?;
4615                            values.push(scalar);
4616                        }
4617
4618                        work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::InList {
4619                            expr: target,
4620                            list: values,
4621                            negated: *negated,
4622                        }));
4623                    }
4624                }
4625                SqlExpr::InSubquery { .. } => {
4626                    return Err(Error::InvalidArgumentError(
4627                        "IN (SELECT ...) subqueries must be materialized before translation".into(),
4628                    ));
4629                }
4630                SqlExpr::Between {
4631                    expr: between_expr,
4632                    negated,
4633                    low,
4634                    high,
4635                } => {
4636                    let lower_bound = translate_comparison_with_context(
4637                        resolver,
4638                        context.clone(),
4639                        between_expr,
4640                        BinaryOperator::GtEq,
4641                        low,
4642                    )?;
4643                    let upper_bound = translate_comparison_with_context(
4644                        resolver,
4645                        context.clone(),
4646                        between_expr,
4647                        BinaryOperator::LtEq,
4648                        high,
4649                    )?;
4650
4651                    let between_expr_result =
4652                        llkv_expr::expr::Expr::And(vec![lower_bound, upper_bound]);
4653
4654                    let result = if *negated {
4655                        llkv_expr::expr::Expr::not(between_expr_result)
4656                    } else {
4657                        between_expr_result
4658                    };
4659                    work_stack.push(ConditionFrame::Leaf(result));
4660                }
4661                other => {
4662                    return Err(Error::InvalidArgumentError(format!(
4663                        "unsupported WHERE clause: {other:?}"
4664                    )));
4665                }
4666            },
4667            ConditionFrame::Leaf(translated) => {
4668                result_stack.push(translated);
4669            }
4670            ConditionFrame::Exit(exit_context) => match exit_context {
4671                ConditionExitContext::And => {
4672                    let right = result_stack.pop().ok_or_else(|| {
4673                        Error::Internal(
4674                            "translate_condition: result stack underflow for And right".into(),
4675                        )
4676                    })?;
4677                    let left = result_stack.pop().ok_or_else(|| {
4678                        Error::Internal(
4679                            "translate_condition: result stack underflow for And left".into(),
4680                        )
4681                    })?;
4682                    result_stack.push(flatten_and(left, right));
4683                }
4684                ConditionExitContext::Or => {
4685                    let right = result_stack.pop().ok_or_else(|| {
4686                        Error::Internal(
4687                            "translate_condition: result stack underflow for Or right".into(),
4688                        )
4689                    })?;
4690                    let left = result_stack.pop().ok_or_else(|| {
4691                        Error::Internal(
4692                            "translate_condition: result stack underflow for Or left".into(),
4693                        )
4694                    })?;
4695                    result_stack.push(flatten_or(left, right));
4696                }
4697                ConditionExitContext::Not => {
4698                    let inner = result_stack.pop().ok_or_else(|| {
4699                        Error::Internal(
4700                            "translate_condition: result stack underflow for Not".into(),
4701                        )
4702                    })?;
4703                    result_stack.push(llkv_expr::expr::Expr::not(inner));
4704                }
4705                ConditionExitContext::Nested => {
4706                    // Nested is a no-op - just pass through the inner expression
4707                }
4708            },
4709        }
4710    }
4711
4712    result_stack.pop().ok_or_else(|| {
4713        Error::Internal("translate_condition_with_context: empty result stack".into())
4714    })
4715}
4716
4717fn flatten_and(
4718    left: llkv_expr::expr::Expr<'static, String>,
4719    right: llkv_expr::expr::Expr<'static, String>,
4720) -> llkv_expr::expr::Expr<'static, String> {
4721    let mut children: Vec<llkv_expr::expr::Expr<'static, String>> = Vec::new();
4722    match left {
4723        llkv_expr::expr::Expr::And(mut left_children) => children.append(&mut left_children),
4724        other => children.push(other),
4725    }
4726    match right {
4727        llkv_expr::expr::Expr::And(mut right_children) => children.append(&mut right_children),
4728        other => children.push(other),
4729    }
4730    if children.len() == 1 {
4731        children.into_iter().next().unwrap()
4732    } else {
4733        llkv_expr::expr::Expr::And(children)
4734    }
4735}
4736
4737fn flatten_or(
4738    left: llkv_expr::expr::Expr<'static, String>,
4739    right: llkv_expr::expr::Expr<'static, String>,
4740) -> llkv_expr::expr::Expr<'static, String> {
4741    let mut children: Vec<llkv_expr::expr::Expr<'static, String>> = Vec::new();
4742    match left {
4743        llkv_expr::expr::Expr::Or(mut left_children) => children.append(&mut left_children),
4744        other => children.push(other),
4745    }
4746    match right {
4747        llkv_expr::expr::Expr::Or(mut right_children) => children.append(&mut right_children),
4748        other => children.push(other),
4749    }
4750    if children.len() == 1 {
4751        children.into_iter().next().unwrap()
4752    } else {
4753        llkv_expr::expr::Expr::Or(children)
4754    }
4755}
4756
4757fn translate_comparison_with_context(
4758    resolver: &IdentifierResolver<'_>,
4759    context: IdentifierContext,
4760    left: &SqlExpr,
4761    op: BinaryOperator,
4762    right: &SqlExpr,
4763) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
4764    let left_scalar = translate_scalar_with_context(resolver, context.clone(), left)?;
4765    let right_scalar = translate_scalar_with_context(resolver, context, right)?;
4766    let compare_op = match op {
4767        BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
4768        BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
4769        BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
4770        BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
4771        BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
4772        BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
4773        other => {
4774            return Err(Error::InvalidArgumentError(format!(
4775                "unsupported comparison operator: {other:?}"
4776            )));
4777        }
4778    };
4779
4780    if let (
4781        llkv_expr::expr::ScalarExpr::Column(column),
4782        llkv_expr::expr::ScalarExpr::Literal(literal),
4783    ) = (&left_scalar, &right_scalar)
4784        && let Some(op) = compare_op_to_filter_operator(compare_op, literal)
4785    {
4786        tracing::debug!(
4787            column = ?column,
4788            literal = ?literal,
4789            ?compare_op,
4790            "translate_comparison direct"
4791        );
4792        return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4793            field_id: column.clone(),
4794            op,
4795        }));
4796    }
4797
4798    if let (
4799        llkv_expr::expr::ScalarExpr::Literal(literal),
4800        llkv_expr::expr::ScalarExpr::Column(column),
4801    ) = (&left_scalar, &right_scalar)
4802        && let Some(flipped) = flip_compare_op(compare_op)
4803        && let Some(op) = compare_op_to_filter_operator(flipped, literal)
4804    {
4805        tracing::debug!(
4806            column = ?column,
4807            literal = ?literal,
4808            original_op = ?compare_op,
4809            flipped_op = ?flipped,
4810            "translate_comparison flipped"
4811        );
4812        return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4813            field_id: column.clone(),
4814            op,
4815        }));
4816    }
4817
4818    Ok(llkv_expr::expr::Expr::Compare {
4819        left: left_scalar,
4820        op: compare_op,
4821        right: right_scalar,
4822    })
4823}
4824
4825fn compare_op_to_filter_operator(
4826    op: llkv_expr::expr::CompareOp,
4827    literal: &Literal,
4828) -> Option<llkv_expr::expr::Operator<'static>> {
4829    if matches!(literal, Literal::Null) {
4830        return None;
4831    }
4832    let lit = literal.clone();
4833    tracing::debug!(?op, literal = ?literal, "compare_op_to_filter_operator input");
4834    match op {
4835        llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::Operator::Equals(lit)),
4836        llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::Operator::LessThan(lit)),
4837        llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::Operator::LessThanOrEquals(lit)),
4838        llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::Operator::GreaterThan(lit)),
4839        llkv_expr::expr::CompareOp::GtEq => {
4840            Some(llkv_expr::expr::Operator::GreaterThanOrEquals(lit))
4841        }
4842        llkv_expr::expr::CompareOp::NotEq => None,
4843    }
4844}
4845
4846fn flip_compare_op(op: llkv_expr::expr::CompareOp) -> Option<llkv_expr::expr::CompareOp> {
4847    match op {
4848        llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::CompareOp::Eq),
4849        llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::CompareOp::Gt),
4850        llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::CompareOp::GtEq),
4851        llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::CompareOp::Lt),
4852        llkv_expr::expr::CompareOp::GtEq => Some(llkv_expr::expr::CompareOp::LtEq),
4853        llkv_expr::expr::CompareOp::NotEq => None,
4854    }
4855}
4856/// Translate scalar expression with knowledge of the FROM table context.
4857/// This allows us to properly distinguish schema.table.column from column.field.field.
4858fn translate_scalar_with_context(
4859    resolver: &IdentifierResolver<'_>,
4860    context: IdentifierContext,
4861    expr: &SqlExpr,
4862) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4863    translate_scalar_internal(expr, Some(resolver), Some(&context))
4864}
4865
4866#[allow(dead_code)]
4867fn translate_scalar(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4868    translate_scalar_internal(expr, None, None)
4869}
4870
4871fn translate_scalar_internal(
4872    expr: &SqlExpr,
4873    resolver: Option<&IdentifierResolver<'_>>,
4874    context: Option<&IdentifierContext>,
4875) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4876    // Iterative postorder traversal using the TransformFrame pattern.
4877    // See llkv-plan::traversal module documentation for pattern details.
4878    //
4879    // This avoids stack overflow on deeply nested expressions (50k+ nodes) by using
4880    // explicit work_stack and result_stack instead of recursion.
4881
4882    /// Context passed through Exit frames during scalar expression translation
4883    enum ScalarExitContext {
4884        BinaryOp { op: BinaryOperator },
4885        UnaryMinus,
4886        UnaryPlus,
4887        Nested,
4888    }
4889
4890    type ScalarFrame<'a> =
4891        TransformFrame<'a, SqlExpr, llkv_expr::expr::ScalarExpr<String>, ScalarExitContext>;
4892
4893    let mut work_stack: Vec<ScalarFrame> = vec![ScalarFrame::Enter(expr)];
4894    let mut result_stack: Vec<llkv_expr::expr::ScalarExpr<String>> = Vec::new();
4895
4896    while let Some(frame) = work_stack.pop() {
4897        match frame {
4898            ScalarFrame::Enter(node) => match node {
4899                SqlExpr::Identifier(ident) => {
4900                    if let (Some(resolver), Some(ctx)) = (resolver, context) {
4901                        let parts = vec![ident.value.clone()];
4902                        let resolution = resolver.resolve(&parts, (*ctx).clone())?;
4903                        work_stack.push(ScalarFrame::Leaf(resolution.into_scalar_expr()));
4904                    } else {
4905                        work_stack.push(ScalarFrame::Leaf(llkv_expr::expr::ScalarExpr::column(
4906                            ident.value.clone(),
4907                        )));
4908                    }
4909                }
4910                SqlExpr::CompoundIdentifier(idents) => {
4911                    if idents.is_empty() {
4912                        return Err(Error::InvalidArgumentError(
4913                            "invalid compound identifier".into(),
4914                        ));
4915                    }
4916
4917                    if let (Some(resolver), Some(ctx)) = (resolver, context) {
4918                        let parts: Vec<String> =
4919                            idents.iter().map(|ident| ident.value.clone()).collect();
4920                        let resolution = resolver.resolve(&parts, (*ctx).clone())?;
4921                        work_stack.push(ScalarFrame::Leaf(resolution.into_scalar_expr()));
4922                    } else {
4923                        let column_name = idents[0].value.clone();
4924                        let mut result = llkv_expr::expr::ScalarExpr::column(column_name);
4925
4926                        for part in &idents[1..] {
4927                            let field_name = part.value.clone();
4928                            result = llkv_expr::expr::ScalarExpr::get_field(result, field_name);
4929                        }
4930
4931                        work_stack.push(ScalarFrame::Leaf(result));
4932                    }
4933                }
4934                SqlExpr::Value(value) => {
4935                    let result = literal_from_value(value)?;
4936                    work_stack.push(ScalarFrame::Leaf(result));
4937                }
4938                SqlExpr::BinaryOp { left, op, right } => {
4939                    work_stack.push(ScalarFrame::Exit(ScalarExitContext::BinaryOp {
4940                        op: op.clone(),
4941                    }));
4942                    work_stack.push(ScalarFrame::Enter(right));
4943                    work_stack.push(ScalarFrame::Enter(left));
4944                }
4945                SqlExpr::UnaryOp {
4946                    op: UnaryOperator::Minus,
4947                    expr: inner,
4948                } => {
4949                    work_stack.push(ScalarFrame::Exit(ScalarExitContext::UnaryMinus));
4950                    work_stack.push(ScalarFrame::Enter(inner));
4951                }
4952                SqlExpr::UnaryOp {
4953                    op: UnaryOperator::Plus,
4954                    expr: inner,
4955                } => {
4956                    work_stack.push(ScalarFrame::Exit(ScalarExitContext::UnaryPlus));
4957                    work_stack.push(ScalarFrame::Enter(inner));
4958                }
4959                SqlExpr::Nested(inner) => {
4960                    work_stack.push(ScalarFrame::Exit(ScalarExitContext::Nested));
4961                    work_stack.push(ScalarFrame::Enter(inner));
4962                }
4963                SqlExpr::Cast { expr: inner, .. } => {
4964                    // TODO: implement typed CAST semantics once executor supports runtime coercions.
4965                    // For now, treat CAST as a passthrough so the inner expression is evaluated normally.
4966                    work_stack.push(ScalarFrame::Enter(inner));
4967                }
4968                SqlExpr::Function(func) => {
4969                    if let Some(agg_call) = try_parse_aggregate_function(func)? {
4970                        work_stack.push(ScalarFrame::Leaf(llkv_expr::expr::ScalarExpr::aggregate(
4971                            agg_call,
4972                        )));
4973                    } else {
4974                        return Err(Error::InvalidArgumentError(format!(
4975                            "unsupported function in scalar expression: {:?}",
4976                            func.name
4977                        )));
4978                    }
4979                }
4980                SqlExpr::Dictionary(fields) => {
4981                    // Process dictionary fields iteratively to avoid recursion
4982                    let mut struct_fields = Vec::new();
4983                    for entry in fields {
4984                        let key = entry.key.value.clone();
4985                        // Reuse scalar translation for nested values while honoring identifier context.
4986                        // Dictionaries rarely nest deeply, so recursion here is acceptable.
4987                        let value_expr =
4988                            translate_scalar_internal(&entry.value, resolver, context)?;
4989                        match value_expr {
4990                            llkv_expr::expr::ScalarExpr::Literal(lit) => {
4991                                struct_fields.push((key, Box::new(lit)));
4992                            }
4993                            _ => {
4994                                return Err(Error::InvalidArgumentError(
4995                                    "Dictionary values must be literals".to_string(),
4996                                ));
4997                            }
4998                        }
4999                    }
5000                    work_stack.push(ScalarFrame::Leaf(llkv_expr::expr::ScalarExpr::literal(
5001                        Literal::Struct(struct_fields),
5002                    )));
5003                }
5004                other => {
5005                    return Err(Error::InvalidArgumentError(format!(
5006                        "unsupported scalar expression: {other:?}"
5007                    )));
5008                }
5009            },
5010            ScalarFrame::Leaf(translated) => {
5011                result_stack.push(translated);
5012            }
5013            ScalarFrame::Exit(exit_context) => match exit_context {
5014                ScalarExitContext::BinaryOp { op } => {
5015                    let right_expr = result_stack.pop().ok_or_else(|| {
5016                        Error::Internal(
5017                            "translate_scalar: result stack underflow for BinaryOp right".into(),
5018                        )
5019                    })?;
5020                    let left_expr = result_stack.pop().ok_or_else(|| {
5021                        Error::Internal(
5022                            "translate_scalar: result stack underflow for BinaryOp left".into(),
5023                        )
5024                    })?;
5025                    let binary_op = match op {
5026                        BinaryOperator::Plus => llkv_expr::expr::BinaryOp::Add,
5027                        BinaryOperator::Minus => llkv_expr::expr::BinaryOp::Subtract,
5028                        BinaryOperator::Multiply => llkv_expr::expr::BinaryOp::Multiply,
5029                        BinaryOperator::Divide => llkv_expr::expr::BinaryOp::Divide,
5030                        BinaryOperator::Modulo => llkv_expr::expr::BinaryOp::Modulo,
5031                        other => {
5032                            return Err(Error::InvalidArgumentError(format!(
5033                                "unsupported scalar binary operator: {other:?}"
5034                            )));
5035                        }
5036                    };
5037                    result_stack.push(llkv_expr::expr::ScalarExpr::binary(
5038                        left_expr, binary_op, right_expr,
5039                    ));
5040                }
5041                ScalarExitContext::UnaryMinus => {
5042                    let inner = result_stack.pop().ok_or_else(|| {
5043                        Error::Internal(
5044                            "translate_scalar: result stack underflow for UnaryMinus".into(),
5045                        )
5046                    })?;
5047                    match inner {
5048                        llkv_expr::expr::ScalarExpr::Literal(lit) => match lit {
5049                            Literal::Integer(v) => {
5050                                result_stack.push(llkv_expr::expr::ScalarExpr::literal(
5051                                    Literal::Integer(-v),
5052                                ));
5053                            }
5054                            Literal::Float(v) => {
5055                                result_stack
5056                                    .push(llkv_expr::expr::ScalarExpr::literal(Literal::Float(-v)));
5057                            }
5058                            Literal::Boolean(_) => {
5059                                return Err(Error::InvalidArgumentError(
5060                                    "cannot negate boolean literal".into(),
5061                                ));
5062                            }
5063                            Literal::String(_) => {
5064                                return Err(Error::InvalidArgumentError(
5065                                    "cannot negate string literal".into(),
5066                                ));
5067                            }
5068                            Literal::Struct(_) => {
5069                                return Err(Error::InvalidArgumentError(
5070                                    "cannot negate struct literal".into(),
5071                                ));
5072                            }
5073                            Literal::Null => {
5074                                result_stack
5075                                    .push(llkv_expr::expr::ScalarExpr::literal(Literal::Null));
5076                            }
5077                        },
5078                        other => {
5079                            let zero = llkv_expr::expr::ScalarExpr::literal(Literal::Integer(0));
5080                            result_stack.push(llkv_expr::expr::ScalarExpr::binary(
5081                                zero,
5082                                llkv_expr::expr::BinaryOp::Subtract,
5083                                other,
5084                            ));
5085                        }
5086                    }
5087                }
5088                ScalarExitContext::UnaryPlus => {
5089                    // Unary plus is a no-op - just pass through
5090                }
5091                ScalarExitContext::Nested => {
5092                    // Nested is a no-op - just pass through
5093                }
5094            },
5095        }
5096    }
5097
5098    result_stack
5099        .pop()
5100        .ok_or_else(|| Error::Internal("translate_scalar: empty result stack".into()))
5101}
5102
5103fn literal_from_value(value: &ValueWithSpan) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
5104    match &value.value {
5105        Value::Number(text, _) => {
5106            if text.contains(['.', 'e', 'E']) {
5107                let parsed = text.parse::<f64>().map_err(|err| {
5108                    Error::InvalidArgumentError(format!("invalid float literal: {err}"))
5109                })?;
5110                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(parsed)))
5111            } else {
5112                let parsed = text.parse::<i128>().map_err(|err| {
5113                    Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
5114                })?;
5115                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(
5116                    parsed,
5117                )))
5118            }
5119        }
5120        Value::Boolean(value) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Boolean(
5121            *value,
5122        ))),
5123        Value::Null => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Null)),
5124        other => {
5125            if let Some(text) = other.clone().into_string() {
5126                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::String(text)))
5127            } else {
5128                Err(Error::InvalidArgumentError(format!(
5129                    "unsupported literal: {other:?}"
5130                )))
5131            }
5132        }
5133    }
5134}
5135
5136fn resolve_assignment_column_name(target: &AssignmentTarget) -> SqlResult<String> {
5137    match target {
5138        AssignmentTarget::ColumnName(name) => {
5139            if name.0.len() != 1 {
5140                return Err(Error::InvalidArgumentError(
5141                    "qualified column names in UPDATE assignments are not supported yet".into(),
5142                ));
5143            }
5144            match &name.0[0] {
5145                ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
5146                other => Err(Error::InvalidArgumentError(format!(
5147                    "unsupported column reference in UPDATE assignment: {other:?}"
5148                ))),
5149            }
5150        }
5151        AssignmentTarget::Tuple(_) => Err(Error::InvalidArgumentError(
5152            "tuple assignments are not supported yet".into(),
5153        )),
5154    }
5155}
5156
5157fn arrow_type_from_sql(data_type: &SqlDataType) -> SqlResult<arrow::datatypes::DataType> {
5158    use arrow::datatypes::DataType;
5159    match data_type {
5160        SqlDataType::Int(_)
5161        | SqlDataType::Integer(_)
5162        | SqlDataType::BigInt(_)
5163        | SqlDataType::SmallInt(_)
5164        | SqlDataType::TinyInt(_) => Ok(DataType::Int64),
5165        SqlDataType::Float(_)
5166        | SqlDataType::Real
5167        | SqlDataType::Double(_)
5168        | SqlDataType::DoublePrecision => Ok(DataType::Float64),
5169        SqlDataType::Text
5170        | SqlDataType::String(_)
5171        | SqlDataType::Varchar(_)
5172        | SqlDataType::Char(_)
5173        | SqlDataType::Uuid => Ok(DataType::Utf8),
5174        SqlDataType::Date => Ok(DataType::Date32),
5175        SqlDataType::Decimal(_) | SqlDataType::Numeric(_) => Ok(DataType::Float64),
5176        SqlDataType::Boolean => Ok(DataType::Boolean),
5177        SqlDataType::Custom(name, args) => {
5178            if name.0.len() == 1
5179                && let ObjectNamePart::Identifier(ident) = &name.0[0]
5180                && ident.value.eq_ignore_ascii_case("row")
5181            {
5182                return row_type_to_arrow(data_type, args);
5183            }
5184            Err(Error::InvalidArgumentError(format!(
5185                "unsupported SQL data type: {data_type:?}"
5186            )))
5187        }
5188        other => Err(Error::InvalidArgumentError(format!(
5189            "unsupported SQL data type: {other:?}"
5190        ))),
5191    }
5192}
5193
5194fn row_type_to_arrow(
5195    data_type: &SqlDataType,
5196    tokens: &[String],
5197) -> SqlResult<arrow::datatypes::DataType> {
5198    use arrow::datatypes::{DataType, Field, FieldRef, Fields};
5199
5200    let row_str = data_type.to_string();
5201    if tokens.is_empty() {
5202        return Err(Error::InvalidArgumentError(
5203            "ROW type must define at least one field".into(),
5204        ));
5205    }
5206
5207    let dialect = GenericDialect {};
5208    let field_definitions = resolve_row_field_types(tokens, &dialect).map_err(|err| {
5209        Error::InvalidArgumentError(format!("unable to parse ROW type '{row_str}': {err}"))
5210    })?;
5211
5212    let mut fields: Vec<FieldRef> = Vec::with_capacity(field_definitions.len());
5213    for (field_name, field_type) in field_definitions {
5214        let arrow_field_type = arrow_type_from_sql(&field_type)?;
5215        fields.push(Arc::new(Field::new(field_name, arrow_field_type, true)));
5216    }
5217
5218    let struct_fields: Fields = fields.into();
5219    Ok(DataType::Struct(struct_fields))
5220}
5221
5222fn resolve_row_field_types(
5223    tokens: &[String],
5224    dialect: &GenericDialect,
5225) -> SqlResult<Vec<(String, SqlDataType)>> {
5226    if tokens.is_empty() {
5227        return Err(Error::InvalidArgumentError(
5228            "ROW type must define at least one field".into(),
5229        ));
5230    }
5231
5232    let mut start = 0;
5233    let mut end = tokens.len();
5234    if tokens[start] == "(" {
5235        if end == 0 || tokens[end - 1] != ")" {
5236            return Err(Error::InvalidArgumentError(
5237                "ROW type is missing closing ')'".into(),
5238            ));
5239        }
5240        start += 1;
5241        end -= 1;
5242    } else if tokens[end - 1] == ")" {
5243        return Err(Error::InvalidArgumentError(
5244            "ROW type contains unmatched ')'".into(),
5245        ));
5246    }
5247
5248    let slice = &tokens[start..end];
5249    if slice.is_empty() {
5250        return Err(Error::InvalidArgumentError(
5251            "ROW type did not provide any field definitions".into(),
5252        ));
5253    }
5254
5255    let mut fields = Vec::new();
5256    let mut index = 0;
5257
5258    while index < slice.len() {
5259        if slice[index] == "," {
5260            index += 1;
5261            continue;
5262        }
5263
5264        let field_name = normalize_row_field_name(&slice[index])?;
5265        index += 1;
5266
5267        if index >= slice.len() {
5268            return Err(Error::InvalidArgumentError(format!(
5269                "ROW field '{field_name}' is missing a type specification"
5270            )));
5271        }
5272
5273        let mut last_success: Option<(usize, SqlDataType)> = None;
5274        let mut type_end = index;
5275
5276        while type_end <= slice.len() {
5277            let candidate = slice[index..type_end].join(" ");
5278            if candidate.trim().is_empty() {
5279                type_end += 1;
5280                continue;
5281            }
5282
5283            if let Ok(parsed_type) = parse_sql_data_type(&candidate, dialect) {
5284                last_success = Some((type_end, parsed_type));
5285            }
5286
5287            if type_end == slice.len() {
5288                break;
5289            }
5290
5291            if slice[type_end] == "," && last_success.is_some() {
5292                break;
5293            }
5294
5295            type_end += 1;
5296        }
5297
5298        let Some((next_index, data_type)) = last_success else {
5299            return Err(Error::InvalidArgumentError(format!(
5300                "failed to parse ROW field type for '{field_name}'"
5301            )));
5302        };
5303
5304        fields.push((field_name, data_type));
5305        index = next_index;
5306
5307        if index < slice.len() && slice[index] == "," {
5308            index += 1;
5309        }
5310    }
5311
5312    if fields.is_empty() {
5313        return Err(Error::InvalidArgumentError(
5314            "ROW type did not provide any field definitions".into(),
5315        ));
5316    }
5317
5318    Ok(fields)
5319}
5320
5321/// Parse SQL string into statements with increased recursion limit.
5322///
5323/// This helper wraps sqlparser's `Parser` with a custom recursion limit to handle
5324/// deeply nested queries that exceed the default limit of 50.
5325///
5326/// # Arguments
5327///
5328/// * `dialect` - SQL dialect to use for parsing
5329/// * `sql` - SQL string to parse
5330///
5331/// # Returns
5332///
5333/// Parsed statements or parser error
5334fn parse_sql_with_recursion_limit(
5335    dialect: &GenericDialect,
5336    sql: &str,
5337) -> Result<Vec<Statement>, sqlparser::parser::ParserError> {
5338    Parser::new(dialect)
5339        .with_recursion_limit(PARSER_RECURSION_LIMIT)
5340        .try_with_sql(sql)?
5341        .parse_statements()
5342}
5343
5344fn normalize_row_field_name(raw: &str) -> SqlResult<String> {
5345    let trimmed = raw.trim();
5346    if trimmed.is_empty() {
5347        return Err(Error::InvalidArgumentError(
5348            "ROW field name must not be empty".into(),
5349        ));
5350    }
5351
5352    if let Some(stripped) = trimmed.strip_prefix('"') {
5353        let without_end = stripped.strip_suffix('"').ok_or_else(|| {
5354            Error::InvalidArgumentError(format!("unterminated quoted ROW field name: {trimmed}"))
5355        })?;
5356        let name = without_end.replace("\"\"", "\"");
5357        return Ok(name);
5358    }
5359
5360    Ok(trimmed.to_string())
5361}
5362
5363fn parse_sql_data_type(type_str: &str, dialect: &GenericDialect) -> SqlResult<SqlDataType> {
5364    let trimmed = type_str.trim();
5365    let sql = format!("CREATE TABLE __row(__field {trimmed});");
5366    let statements = parse_sql_with_recursion_limit(dialect, &sql).map_err(|err| {
5367        Error::InvalidArgumentError(format!("failed to parse ROW field type '{trimmed}': {err}"))
5368    })?;
5369
5370    let stmt = statements.into_iter().next().ok_or_else(|| {
5371        Error::InvalidArgumentError(format!(
5372            "ROW field type '{trimmed}' did not produce a statement"
5373        ))
5374    })?;
5375
5376    match stmt {
5377        Statement::CreateTable(table) => table
5378            .columns
5379            .first()
5380            .map(|col| col.data_type.clone())
5381            .ok_or_else(|| {
5382                Error::InvalidArgumentError(format!(
5383                    "ROW field type '{trimmed}' missing column definition"
5384                ))
5385            }),
5386        other => Err(Error::InvalidArgumentError(format!(
5387            "unexpected statement while parsing ROW field type: {other:?}"
5388        ))),
5389    }
5390}
5391
5392/// Extract VALUES data from a derived table in FROM clause.
5393/// Returns (rows, column_names) if the pattern matches: SELECT ... FROM (VALUES ...) alias(col1, col2, ...)
5394type ExtractValuesResult = Option<(Vec<Vec<PlanValue>>, Vec<String>)>;
5395
5396#[allow(clippy::type_complexity)]
5397fn extract_values_from_derived_table(from: &[TableWithJoins]) -> SqlResult<ExtractValuesResult> {
5398    if from.len() != 1 {
5399        return Ok(None);
5400    }
5401
5402    let table_with_joins = &from[0];
5403    if !table_with_joins.joins.is_empty() {
5404        return Ok(None);
5405    }
5406
5407    match &table_with_joins.relation {
5408        TableFactor::Derived {
5409            subquery, alias, ..
5410        } => {
5411            // Check if the subquery is a VALUES expression
5412            let values = match subquery.body.as_ref() {
5413                SetExpr::Values(v) => v,
5414                _ => return Ok(None),
5415            };
5416
5417            // Extract column names from alias
5418            let column_names = if let Some(alias) = alias {
5419                alias
5420                    .columns
5421                    .iter()
5422                    .map(|col_def| col_def.name.value.clone())
5423                    .collect::<Vec<_>>()
5424            } else {
5425                // Generate default column names if no alias provided
5426                if values.rows.is_empty() {
5427                    return Err(Error::InvalidArgumentError(
5428                        "VALUES expression must have at least one row".into(),
5429                    ));
5430                }
5431                let first_row = &values.rows[0];
5432                (0..first_row.len())
5433                    .map(|i| format!("column{}", i))
5434                    .collect()
5435            };
5436
5437            // Extract rows
5438            if values.rows.is_empty() {
5439                return Err(Error::InvalidArgumentError(
5440                    "VALUES expression must have at least one row".into(),
5441                ));
5442            }
5443
5444            let mut rows = Vec::with_capacity(values.rows.len());
5445            for row in &values.rows {
5446                if row.len() != column_names.len() {
5447                    return Err(Error::InvalidArgumentError(format!(
5448                        "VALUES row has {} columns but table alias specifies {} columns",
5449                        row.len(),
5450                        column_names.len()
5451                    )));
5452                }
5453
5454                let mut converted_row = Vec::with_capacity(row.len());
5455                for expr in row {
5456                    let value = SqlValue::try_from_expr(expr)?;
5457                    converted_row.push(PlanValue::from(value));
5458                }
5459                rows.push(converted_row);
5460            }
5461
5462            Ok(Some((rows, column_names)))
5463        }
5464        _ => Ok(None),
5465    }
5466}
5467
5468fn extract_constant_select_rows(select: &Select) -> SqlResult<Option<Vec<Vec<PlanValue>>>> {
5469    if !select.from.is_empty() {
5470        return Ok(None);
5471    }
5472
5473    if select.selection.is_some()
5474        || select.having.is_some()
5475        || !select.named_window.is_empty()
5476        || select.qualify.is_some()
5477        || select.distinct.is_some()
5478        || select.top.is_some()
5479        || select.into.is_some()
5480        || select.prewhere.is_some()
5481        || !select.lateral_views.is_empty()
5482        || select.value_table_mode.is_some()
5483        || !group_by_is_empty(&select.group_by)
5484    {
5485        return Err(Error::InvalidArgumentError(
5486            "constant SELECT statements do not support advanced clauses".into(),
5487        ));
5488    }
5489
5490    if select.projection.is_empty() {
5491        return Err(Error::InvalidArgumentError(
5492            "constant SELECT requires at least one projection".into(),
5493        ));
5494    }
5495
5496    let mut row: Vec<PlanValue> = Vec::with_capacity(select.projection.len());
5497    for item in &select.projection {
5498        let expr = match item {
5499            SelectItem::UnnamedExpr(expr) => expr,
5500            SelectItem::ExprWithAlias { expr, .. } => expr,
5501            other => {
5502                return Err(Error::InvalidArgumentError(format!(
5503                    "unsupported projection in constant SELECT: {other:?}"
5504                )));
5505            }
5506        };
5507
5508        let value = SqlValue::try_from_expr(expr)?;
5509        row.push(PlanValue::from(value));
5510    }
5511
5512    Ok(Some(vec![row]))
5513}
5514
5515fn extract_single_table(from: &[TableWithJoins]) -> SqlResult<(String, String)> {
5516    if from.len() != 1 {
5517        return Err(Error::InvalidArgumentError(
5518            "queries over multiple tables are not supported yet".into(),
5519        ));
5520    }
5521    let item = &from[0];
5522    if !item.joins.is_empty() {
5523        return Err(Error::InvalidArgumentError(
5524            "JOIN clauses are not supported yet".into(),
5525        ));
5526    }
5527    match &item.relation {
5528        TableFactor::Table { name, .. } => canonical_object_name(name),
5529        TableFactor::Derived { alias, .. } => {
5530            // Derived table (subquery) - use the alias as the table name if provided
5531            // For CTAS, this allows: CREATE TABLE t AS SELECT * FROM (VALUES ...) v(id)
5532            let table_name = alias
5533                .as_ref()
5534                .map(|a| a.name.value.clone())
5535                .unwrap_or_else(|| "derived".to_string());
5536            let canonical = table_name.to_ascii_lowercase();
5537            Ok((table_name, canonical))
5538        }
5539        _ => Err(Error::InvalidArgumentError(
5540            "queries require a plain table name or derived table".into(),
5541        )),
5542    }
5543}
5544
5545/// Extract table references from a FROM clause, flattening supported JOINs.
5546fn extract_tables(from: &[TableWithJoins]) -> SqlResult<Vec<llkv_plan::TableRef>> {
5547    let mut tables = Vec::new();
5548
5549    for item in from {
5550        push_table_factor(&item.relation, &mut tables)?;
5551
5552        for join in &item.joins {
5553            match &join.join_operator {
5554                JoinOperator::CrossJoin(JoinConstraint::None)
5555                | JoinOperator::Inner(JoinConstraint::None) => {
5556                    push_table_factor(&join.relation, &mut tables)?;
5557                }
5558                JoinOperator::CrossJoin(_) => {
5559                    return Err(Error::InvalidArgumentError(
5560                        "CROSS JOIN with constraints is not supported".into(),
5561                    ));
5562                }
5563                _ => {
5564                    return Err(Error::InvalidArgumentError(
5565                        "only CROSS JOIN without constraints is supported".into(),
5566                    ));
5567                }
5568            }
5569        }
5570    }
5571
5572    Ok(tables)
5573}
5574
5575fn push_table_factor(factor: &TableFactor, tables: &mut Vec<llkv_plan::TableRef>) -> SqlResult<()> {
5576    match factor {
5577        TableFactor::Table { name, alias, .. } => {
5578            let (schema_opt, table) = parse_schema_qualified_name(name)?;
5579            let schema = schema_opt.unwrap_or_default();
5580            let alias_name = alias.as_ref().map(|a| a.name.value.clone());
5581            tables.push(llkv_plan::TableRef::with_alias(schema, table, alias_name));
5582            Ok(())
5583        }
5584        TableFactor::Derived { .. } => Err(Error::InvalidArgumentError(
5585            "JOIN clauses require base tables; derived tables are not supported".into(),
5586        )),
5587        _ => Err(Error::InvalidArgumentError(
5588            "queries require a plain table name".into(),
5589        )),
5590    }
5591}
5592
5593fn group_by_is_empty(expr: &GroupByExpr) -> bool {
5594    matches!(
5595        expr,
5596        GroupByExpr::Expressions(exprs, modifiers)
5597            if exprs.is_empty() && modifiers.is_empty()
5598    )
5599}
5600
5601#[cfg(test)]
5602mod tests {
5603    use super::*;
5604    use arrow::array::{Array, Int64Array, StringArray};
5605    use arrow::record_batch::RecordBatch;
5606    use llkv_storage::pager::MemPager;
5607
5608    fn extract_string_options(batches: &[RecordBatch]) -> Vec<Option<String>> {
5609        let mut values: Vec<Option<String>> = Vec::new();
5610        for batch in batches {
5611            let column = batch
5612                .column(0)
5613                .as_any()
5614                .downcast_ref::<StringArray>()
5615                .expect("string column");
5616            for idx in 0..column.len() {
5617                if column.is_null(idx) {
5618                    values.push(None);
5619                } else {
5620                    values.push(Some(column.value(idx).to_string()));
5621                }
5622            }
5623        }
5624        values
5625    }
5626
5627    #[test]
5628    fn create_insert_select_roundtrip() {
5629        let pager = Arc::new(MemPager::default());
5630        let engine = SqlEngine::new(pager);
5631
5632        let result = engine
5633            .execute("CREATE TABLE people (id INT NOT NULL, name TEXT NOT NULL)")
5634            .expect("create table");
5635        assert!(matches!(
5636            result[0],
5637            RuntimeStatementResult::CreateTable { .. }
5638        ));
5639
5640        let result = engine
5641            .execute("INSERT INTO people (id, name) VALUES (1, 'alice'), (2, 'bob')")
5642            .expect("insert rows");
5643        assert!(matches!(
5644            result[0],
5645            RuntimeStatementResult::Insert {
5646                rows_inserted: 2,
5647                ..
5648            }
5649        ));
5650
5651        let mut result = engine
5652            .execute("SELECT name FROM people WHERE id = 2")
5653            .expect("select rows");
5654        let select_result = result.remove(0);
5655        let batches = match select_result {
5656            RuntimeStatementResult::Select { execution, .. } => {
5657                execution.collect().expect("collect batches")
5658            }
5659            _ => panic!("expected select result"),
5660        };
5661        assert_eq!(batches.len(), 1);
5662        let column = batches[0]
5663            .column(0)
5664            .as_any()
5665            .downcast_ref::<StringArray>()
5666            .expect("string column");
5667        assert_eq!(column.len(), 1);
5668        assert_eq!(column.value(0), "bob");
5669    }
5670
5671    #[test]
5672    fn insert_select_constant_including_null() {
5673        let pager = Arc::new(MemPager::default());
5674        let engine = SqlEngine::new(pager);
5675
5676        engine
5677            .execute("CREATE TABLE integers(i INTEGER)")
5678            .expect("create table");
5679
5680        let result = engine
5681            .execute("INSERT INTO integers SELECT 42")
5682            .expect("insert literal");
5683        assert!(matches!(
5684            result[0],
5685            RuntimeStatementResult::Insert {
5686                rows_inserted: 1,
5687                ..
5688            }
5689        ));
5690
5691        let result = engine
5692            .execute("INSERT INTO integers SELECT CAST(NULL AS VARCHAR)")
5693            .expect("insert null literal");
5694        assert!(matches!(
5695            result[0],
5696            RuntimeStatementResult::Insert {
5697                rows_inserted: 1,
5698                ..
5699            }
5700        ));
5701
5702        let mut result = engine
5703            .execute("SELECT * FROM integers")
5704            .expect("select rows");
5705        let select_result = result.remove(0);
5706        let batches = match select_result {
5707            RuntimeStatementResult::Select { execution, .. } => {
5708                execution.collect().expect("collect batches")
5709            }
5710            _ => panic!("expected select result"),
5711        };
5712
5713        let mut values: Vec<Option<i64>> = Vec::new();
5714        for batch in &batches {
5715            let column = batch
5716                .column(0)
5717                .as_any()
5718                .downcast_ref::<Int64Array>()
5719                .expect("int column");
5720            for idx in 0..column.len() {
5721                if column.is_null(idx) {
5722                    values.push(None);
5723                } else {
5724                    values.push(Some(column.value(idx)));
5725                }
5726            }
5727        }
5728
5729        assert_eq!(values, vec![Some(42), None]);
5730    }
5731
5732    #[test]
5733    fn not_null_comparison_filters_all_rows() {
5734        let pager = Arc::new(MemPager::default());
5735        let engine = SqlEngine::new(pager);
5736
5737        engine
5738            .execute("CREATE TABLE single(col INTEGER)")
5739            .expect("create table");
5740        engine
5741            .execute("INSERT INTO single VALUES (1)")
5742            .expect("insert row");
5743
5744        let batches = engine
5745            .sql("SELECT * FROM single WHERE NOT ( NULL ) >= NULL")
5746            .expect("run constant null comparison");
5747
5748        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
5749        assert_eq!(total_rows, 0, "expected filter to remove all rows");
5750    }
5751
5752    #[test]
5753    fn not_null_in_list_filters_all_rows() {
5754        let pager = Arc::new(MemPager::default());
5755        let engine = SqlEngine::new(pager);
5756
5757        engine
5758            .execute("CREATE TABLE tab0(col0 INTEGER, col1 INTEGER, col2 INTEGER)")
5759            .expect("create table");
5760        engine
5761            .execute("INSERT INTO tab0 VALUES (1, 2, 3)")
5762            .expect("insert row");
5763
5764        let batches = engine
5765            .sql("SELECT * FROM tab0 WHERE NOT ( NULL ) IN ( - col2 * + col2 )")
5766            .expect("run IN list null comparison");
5767
5768        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
5769        assert_eq!(total_rows, 0, "expected IN list filter to remove all rows");
5770    }
5771
5772    #[test]
5773    fn cross_join_not_null_comparison_filters_all_rows() {
5774        let pager = Arc::new(MemPager::default());
5775        let engine = SqlEngine::new(pager);
5776
5777        engine
5778            .execute("CREATE TABLE left_side(col INTEGER)")
5779            .expect("create left table");
5780        engine
5781            .execute("CREATE TABLE right_side(col INTEGER)")
5782            .expect("create right table");
5783        engine
5784            .execute("INSERT INTO left_side VALUES (1)")
5785            .expect("insert left row");
5786        engine
5787            .execute("INSERT INTO right_side VALUES (2)")
5788            .expect("insert right row");
5789
5790        let batches = engine
5791            .sql("SELECT * FROM left_side CROSS JOIN right_side WHERE NOT ( NULL ) >= NULL")
5792            .expect("run cross join null comparison");
5793
5794        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
5795        assert_eq!(
5796            total_rows, 0,
5797            "expected cross join filter to remove all rows"
5798        );
5799    }
5800
5801    #[test]
5802    fn cross_join_duplicate_table_name_resolves_columns() {
5803        let pager = Arc::new(MemPager::default());
5804        let engine = SqlEngine::new(pager);
5805
5806        use sqlparser::ast::{SetExpr, Statement};
5807        use sqlparser::dialect::SQLiteDialect;
5808        use sqlparser::parser::Parser;
5809
5810        engine
5811            .execute("CREATE TABLE tab1(col0 INTEGER, col1 INTEGER, col2 INTEGER)")
5812            .expect("create tab1");
5813        engine
5814            .execute("INSERT INTO tab1 VALUES (7, 8, 9)")
5815            .expect("insert tab1 row");
5816
5817        let dialect = SQLiteDialect {};
5818        let ast = Parser::parse_sql(
5819            &dialect,
5820            "SELECT tab1.col2 FROM tab1 AS cor0 CROSS JOIN tab1",
5821        )
5822        .expect("parse cross join query");
5823        let Statement::Query(query) = &ast[0] else {
5824            panic!("expected SELECT query");
5825        };
5826        let select = match query.body.as_ref() {
5827            SetExpr::Select(select) => select.as_ref(),
5828            other => panic!("unexpected query body: {other:?}"),
5829        };
5830        assert_eq!(select.from.len(), 1);
5831        assert!(!select.from[0].joins.is_empty());
5832
5833        let batches = engine
5834            .sql("SELECT tab1.col2 FROM tab1 AS cor0 CROSS JOIN tab1")
5835            .expect("run cross join with alias and base table");
5836
5837        let mut values = Vec::new();
5838        for batch in batches {
5839            let column = batch
5840                .column(0)
5841                .as_any()
5842                .downcast_ref::<Int64Array>()
5843                .expect("int column");
5844            for idx in 0..column.len() {
5845                if column.is_null(idx) {
5846                    values.push(None);
5847                } else {
5848                    values.push(Some(column.value(idx)));
5849                }
5850            }
5851        }
5852
5853        assert_eq!(values, vec![Some(9)]);
5854    }
5855
5856    #[test]
5857    fn update_with_where_clause_filters_rows() {
5858        let pager = Arc::new(MemPager::default());
5859        let engine = SqlEngine::new(pager);
5860
5861        engine
5862            .execute("SET default_null_order='nulls_first'")
5863            .expect("set default null order");
5864
5865        engine
5866            .execute("CREATE TABLE strings(a VARCHAR)")
5867            .expect("create table");
5868
5869        engine
5870            .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
5871            .expect("insert seed rows");
5872
5873        let result = engine
5874            .execute("UPDATE strings SET a = 13 WHERE a = '3'")
5875            .expect("update rows");
5876        assert!(matches!(
5877            result[0],
5878            RuntimeStatementResult::Update {
5879                rows_updated: 1,
5880                ..
5881            }
5882        ));
5883
5884        let mut result = engine
5885            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5886            .expect("select rows");
5887        let select_result = result.remove(0);
5888        let batches = match select_result {
5889            RuntimeStatementResult::Select { execution, .. } => {
5890                execution.collect().expect("collect batches")
5891            }
5892            _ => panic!("expected select result"),
5893        };
5894
5895        let mut values: Vec<Option<String>> = Vec::new();
5896        for batch in &batches {
5897            let column = batch
5898                .column(0)
5899                .as_any()
5900                .downcast_ref::<StringArray>()
5901                .expect("string column");
5902            for idx in 0..column.len() {
5903                if column.is_null(idx) {
5904                    values.push(None);
5905                } else {
5906                    values.push(Some(column.value(idx).to_string()));
5907                }
5908            }
5909        }
5910
5911        values.sort_by(|a, b| match (a, b) {
5912            (None, None) => std::cmp::Ordering::Equal,
5913            (None, Some(_)) => std::cmp::Ordering::Less,
5914            (Some(_), None) => std::cmp::Ordering::Greater,
5915            (Some(av), Some(bv)) => {
5916                let a_val = av.parse::<i64>().unwrap_or_default();
5917                let b_val = bv.parse::<i64>().unwrap_or_default();
5918                a_val.cmp(&b_val)
5919            }
5920        });
5921
5922        assert_eq!(
5923            values,
5924            vec![None, Some("4".to_string()), Some("13".to_string())]
5925        );
5926    }
5927
5928    #[test]
5929    fn order_by_honors_configured_default_null_order() {
5930        let pager = Arc::new(MemPager::default());
5931        let engine = SqlEngine::new(pager);
5932
5933        engine
5934            .execute("CREATE TABLE strings(a VARCHAR)")
5935            .expect("create table");
5936        engine
5937            .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
5938            .expect("insert values");
5939        engine
5940            .execute("UPDATE strings SET a = 13 WHERE a = '3'")
5941            .expect("update value");
5942
5943        let mut result = engine
5944            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5945            .expect("select rows");
5946        let select_result = result.remove(0);
5947        let batches = match select_result {
5948            RuntimeStatementResult::Select { execution, .. } => {
5949                execution.collect().expect("collect batches")
5950            }
5951            _ => panic!("expected select result"),
5952        };
5953
5954        let values = extract_string_options(&batches);
5955        assert_eq!(
5956            values,
5957            vec![Some("4".to_string()), Some("13".to_string()), None]
5958        );
5959
5960        assert!(!engine.default_nulls_first_for_tests());
5961
5962        engine
5963            .execute("SET default_null_order='nulls_first'")
5964            .expect("set default null order");
5965
5966        assert!(engine.default_nulls_first_for_tests());
5967
5968        let mut result = engine
5969            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5970            .expect("select rows");
5971        let select_result = result.remove(0);
5972        let batches = match select_result {
5973            RuntimeStatementResult::Select { execution, .. } => {
5974                execution.collect().expect("collect batches")
5975            }
5976            _ => panic!("expected select result"),
5977        };
5978
5979        let values = extract_string_options(&batches);
5980        assert_eq!(
5981            values,
5982            vec![None, Some("4".to_string()), Some("13".to_string())]
5983        );
5984    }
5985
5986    #[test]
5987    fn arrow_type_from_row_returns_struct_fields() {
5988        let dialect = GenericDialect {};
5989        let statements = parse_sql_with_recursion_limit(
5990            &dialect,
5991            "CREATE TABLE row_types(payload ROW(a INTEGER, b VARCHAR));",
5992        )
5993        .expect("parse ROW type definition");
5994
5995        let data_type = match &statements[0] {
5996            Statement::CreateTable(stmt) => stmt.columns[0].data_type.clone(),
5997            other => panic!("unexpected statement: {other:?}"),
5998        };
5999
6000        let arrow_type = arrow_type_from_sql(&data_type).expect("convert ROW type");
6001        match arrow_type {
6002            arrow::datatypes::DataType::Struct(fields) => {
6003                assert_eq!(fields.len(), 2, "unexpected field count");
6004                assert_eq!(fields[0].name(), "a");
6005                assert_eq!(fields[1].name(), "b");
6006                assert_eq!(fields[0].data_type(), &arrow::datatypes::DataType::Int64);
6007                assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
6008            }
6009            other => panic!("expected struct type, got {other:?}"),
6010        }
6011    }
6012}