llkv_sql/
sql_engine.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::{
3    Arc, OnceLock,
4    atomic::{AtomicBool, Ordering as AtomicOrdering},
5};
6
7use crate::SqlResult;
8use crate::SqlValue;
9use arrow::record_batch::RecordBatch;
10
11use llkv_executor::SelectExecution;
12use llkv_expr::literal::Literal;
13use llkv_plan::validation::{
14    ensure_known_columns_case_insensitive, ensure_non_empty, ensure_unique_case_insensitive,
15};
16use llkv_result::Error;
17use llkv_runtime::TEMPORARY_NAMESPACE_ID;
18use llkv_runtime::{
19    AggregateExpr, AssignmentValue, ColumnAssignment, CreateIndexPlan, CreateTablePlan,
20    CreateTableSource, DeletePlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertPlan,
21    InsertSource, MultiColumnUniqueSpec, OrderByPlan, OrderSortType, OrderTarget, PlanColumnSpec,
22    PlanStatement, PlanValue, RenameTablePlan, RuntimeContext, RuntimeEngine, RuntimeSession,
23    RuntimeStatementResult, SelectPlan, SelectProjection, UpdatePlan, extract_rows_from_range,
24};
25use llkv_storage::pager::Pager;
26use llkv_table::CatalogDdl;
27use llkv_table::catalog::{IdentifierContext, IdentifierResolver};
28use regex::Regex;
29use simd_r_drive_entry_handle::EntryHandle;
30use sqlparser::ast::{
31    AlterColumnOperation, AlterTableOperation, Assignment, AssignmentTarget, BeginTransactionKind,
32    BinaryOperator, ColumnOption, ColumnOptionDef, ConstraintCharacteristics,
33    DataType as SqlDataType, Delete, ExceptionWhen, Expr as SqlExpr, FromTable, FunctionArg,
34    FunctionArgExpr, FunctionArguments, GroupByExpr, Ident, LimitClause, NullsDistinctOption,
35    ObjectName, ObjectNamePart, ObjectType, OrderBy, OrderByKind, Query, ReferentialAction,
36    SchemaName, Select, SelectItem, SelectItemQualifiedWildcardKind, Set, SetExpr, SqlOption,
37    Statement, TableConstraint, TableFactor, TableObject, TableWithJoins, TransactionMode,
38    TransactionModifier, UnaryOperator, UpdateTableFromKind, Value, ValueWithSpan,
39};
40use sqlparser::dialect::GenericDialect;
41use sqlparser::parser::Parser;
42
43/// SQL execution engine built on top of the LLKV runtime.
44///
45/// # Examples
46///
47/// ```
48/// use std::sync::Arc;
49///
50/// use arrow::array::StringArray;
51/// use llkv_sql::{RuntimeStatementResult, SqlEngine};
52/// use llkv_storage::pager::MemPager;
53///
54/// let engine = SqlEngine::new(Arc::new(MemPager::default()));
55///
56/// let setup = r#"
57///     CREATE TABLE users (id INT PRIMARY KEY, name TEXT);
58///     INSERT INTO users (id, name) VALUES (1, 'Ada');
59/// "#;
60///
61/// let results = engine.execute(setup).unwrap();
62/// assert_eq!(results.len(), 2);
63///
64/// assert!(matches!(
65///     results[0],
66///     RuntimeStatementResult::CreateTable { ref table_name } if table_name == "users"
67/// ));
68/// assert!(matches!(
69///     results[1],
70///     RuntimeStatementResult::Insert { rows_inserted, .. } if rows_inserted == 1
71/// ));
72///
73/// let batches = engine.sql("SELECT id, name FROM users ORDER BY id;").unwrap();
74/// assert_eq!(batches.len(), 1);
75///
76/// let batch = &batches[0];
77/// assert_eq!(batch.num_rows(), 1);
78/// assert_eq!(batch.schema().field(1).name(), "name");
79///
80/// let names = batch
81///     .column(1)
82///     .as_any()
83///     .downcast_ref::<StringArray>()
84///     .unwrap();
85///
86/// assert_eq!(names.value(0), "Ada");
87/// ```
88pub struct SqlEngine<P>
89where
90    P: Pager<Blob = EntryHandle> + Send + Sync,
91{
92    engine: RuntimeEngine<P>,
93    default_nulls_first: AtomicBool,
94}
95
96const DROPPED_TABLE_TRANSACTION_ERR: &str = "another transaction has dropped this table";
97
98impl<P> Clone for SqlEngine<P>
99where
100    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
101{
102    fn clone(&self) -> Self {
103        tracing::warn!(
104            "[SQL_ENGINE] SqlEngine::clone() called - will create new Engine with new session!"
105        );
106        // Create a new session from the same context
107        Self {
108            engine: self.engine.clone(),
109            default_nulls_first: AtomicBool::new(
110                self.default_nulls_first.load(AtomicOrdering::Relaxed),
111            ),
112        }
113    }
114}
115
116#[allow(dead_code)]
117impl<P> SqlEngine<P>
118where
119    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
120{
121    fn map_table_error(table_name: &str, err: Error) -> Error {
122        match err {
123            Error::NotFound => Self::table_not_found_error(table_name),
124            Error::InvalidArgumentError(msg) if msg.contains("unknown table") => {
125                Self::table_not_found_error(table_name)
126            }
127            other => other,
128        }
129    }
130
131    fn table_not_found_error(table_name: &str) -> Error {
132        Error::CatalogError(format!(
133            "Catalog Error: Table '{table_name}' does not exist"
134        ))
135    }
136
137    fn is_table_missing_error(err: &Error) -> bool {
138        match err {
139            Error::NotFound => true,
140            Error::CatalogError(msg) => {
141                msg.contains("Catalog Error: Table") || msg.contains("unknown table")
142            }
143            Error::InvalidArgumentError(msg) => {
144                msg.contains("Catalog Error: Table") || msg.contains("unknown table")
145            }
146            _ => false,
147        }
148    }
149
150    fn execute_plan_statement(
151        &self,
152        statement: PlanStatement,
153    ) -> SqlResult<RuntimeStatementResult<P>> {
154        let table = llkv_runtime::statement_table_name(&statement).map(str::to_string);
155        self.engine.execute_statement(statement).map_err(|err| {
156            if let Some(table_name) = table {
157                Self::map_table_error(&table_name, err)
158            } else {
159                err
160            }
161        })
162    }
163
164    pub fn new(pager: Arc<P>) -> Self {
165        let engine = RuntimeEngine::new(pager);
166        Self {
167            engine,
168            default_nulls_first: AtomicBool::new(false),
169        }
170    }
171
172    /// Preprocess SQL to handle qualified names in EXCLUDE clauses
173    /// Converts EXCLUDE (schema.table.col) to EXCLUDE ("schema.table.col")
174    /// Preprocess CREATE TYPE to CREATE DOMAIN for sqlparser compatibility.
175    ///
176    /// DuckDB uses `CREATE TYPE name AS basetype` for type aliases, but sqlparser
177    /// only supports the SQL standard `CREATE DOMAIN name AS basetype` syntax.
178    /// This method converts the DuckDB syntax to the standard syntax.
179    fn preprocess_create_type_syntax(sql: &str) -> String {
180        static CREATE_TYPE_REGEX: OnceLock<Regex> = OnceLock::new();
181        static DROP_TYPE_REGEX: OnceLock<Regex> = OnceLock::new();
182
183        // Match: CREATE TYPE name AS datatype
184        let create_re = CREATE_TYPE_REGEX.get_or_init(|| {
185            Regex::new(r"(?i)\bCREATE\s+TYPE\s+").expect("valid CREATE TYPE regex")
186        });
187
188        // Match: DROP TYPE [IF EXISTS] name
189        let drop_re = DROP_TYPE_REGEX
190            .get_or_init(|| Regex::new(r"(?i)\bDROP\s+TYPE\s+").expect("valid DROP TYPE regex"));
191
192        // First replace CREATE TYPE with CREATE DOMAIN
193        let sql = create_re.replace_all(sql, "CREATE DOMAIN ").to_string();
194
195        // Then replace DROP TYPE with DROP DOMAIN
196        drop_re.replace_all(&sql, "DROP DOMAIN ").to_string()
197    }
198
199    fn preprocess_exclude_syntax(sql: &str) -> String {
200        static EXCLUDE_REGEX: OnceLock<Regex> = OnceLock::new();
201
202        // Pattern to match EXCLUDE followed by qualified identifiers
203        // Matches: EXCLUDE (identifier.identifier.identifier)
204        let re = EXCLUDE_REGEX.get_or_init(|| {
205            Regex::new(
206                r"(?i)EXCLUDE\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)+)\s*\)",
207            )
208            .expect("valid EXCLUDE qualifier regex")
209        });
210
211        re.replace_all(sql, |caps: &regex::Captures| {
212            let qualified_name = &caps[1];
213            format!("EXCLUDE (\"{}\")", qualified_name)
214        })
215        .to_string()
216    }
217
218    /// Preprocess SQL to remove trailing commas in VALUES clauses.
219    /// DuckDB allows trailing commas like VALUES ('v2',) but sqlparser does not.
220    fn preprocess_trailing_commas_in_values(sql: &str) -> String {
221        static TRAILING_COMMA_REGEX: OnceLock<Regex> = OnceLock::new();
222
223        // Pattern to match trailing comma before closing paren in VALUES
224        // Matches: , followed by optional whitespace and )
225        let re = TRAILING_COMMA_REGEX
226            .get_or_init(|| Regex::new(r",(\s*)\)").expect("valid trailing comma regex"));
227
228        re.replace_all(sql, "$1)").to_string()
229    }
230
231    pub(crate) fn context_arc(&self) -> Arc<RuntimeContext<P>> {
232        self.engine.context()
233    }
234
235    pub fn with_context(context: Arc<RuntimeContext<P>>, default_nulls_first: bool) -> Self {
236        Self {
237            engine: RuntimeEngine::from_context(context),
238            default_nulls_first: AtomicBool::new(default_nulls_first),
239        }
240    }
241
242    #[cfg(test)]
243    fn default_nulls_first_for_tests(&self) -> bool {
244        self.default_nulls_first.load(AtomicOrdering::Relaxed)
245    }
246
247    fn has_active_transaction(&self) -> bool {
248        self.engine.session().has_active_transaction()
249    }
250
251    /// Get a reference to the underlying session (for advanced use like error handling in test harnesses).
252    pub fn session(&self) -> &RuntimeSession<P> {
253        self.engine.session()
254    }
255
256    /// Execute one or more SQL statements and return their raw [`RuntimeStatementResult`]s.
257    ///
258    /// This method is the general-purpose entry point for running SQL against the engine when
259    /// you need to mix statement types (e.g. `CREATE TABLE`, `INSERT`, `UPDATE`, `SELECT`) or
260    /// when you care about per-statement status information. Statements are executed in the order
261    /// they appear in the input string, and the results vector mirrors that ordering.
262    ///
263    /// For ad-hoc read queries where you only care about the resulting Arrow [`RecordBatch`]es,
264    /// prefer [`SqlEngine::sql`], which enforces a single `SELECT` statement and collects its
265    /// output for you. `execute` remains the right tool for schema migrations, transactional
266    /// scripts, or workflows that need to inspect the specific runtime response for each
267    /// statement.
268    pub fn execute(&self, sql: &str) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
269        tracing::trace!("DEBUG SQL execute: {}", sql);
270
271        // Preprocess SQL to handle CREATE TYPE / DROP TYPE (DuckDB syntax)
272        let processed_sql = Self::preprocess_create_type_syntax(sql);
273
274        // Preprocess SQL to handle qualified names in EXCLUDE
275        // Replace EXCLUDE (schema.table.col) with EXCLUDE ("schema.table.col")
276        let processed_sql = Self::preprocess_exclude_syntax(&processed_sql);
277
278        // Preprocess SQL to remove trailing commas in VALUES clauses
279        // DuckDB allows VALUES ('v2',) but sqlparser does not
280        let processed_sql = Self::preprocess_trailing_commas_in_values(&processed_sql);
281
282        let dialect = GenericDialect {};
283        let statements = Parser::parse_sql(&dialect, &processed_sql)
284            .map_err(|err| Error::InvalidArgumentError(format!("failed to parse SQL: {err}")))?;
285        tracing::trace!("DEBUG SQL execute: parsed {} statements", statements.len());
286
287        let mut results = Vec::with_capacity(statements.len());
288        for (i, statement) in statements.iter().enumerate() {
289            tracing::trace!("DEBUG SQL execute: processing statement {}", i);
290            results.push(self.execute_statement(statement.clone())?);
291            tracing::trace!("DEBUG SQL execute: statement {} completed", i);
292        }
293        tracing::trace!("DEBUG SQL execute completed successfully");
294        Ok(results)
295    }
296
297    /// Execute a single SELECT statement and return its results as Arrow [`RecordBatch`]es.
298    ///
299    /// The SQL passed to this method must contain exactly one statement, and that statement must
300    /// be a `SELECT`. Statements that modify data (e.g. `INSERT`) should be executed up front
301    /// using [`SqlEngine::execute`] before calling this helper.
302    ///
303    /// # Examples
304    ///
305    /// ```
306    /// use std::sync::Arc;
307    ///
308    /// use arrow::array::StringArray;
309    /// use llkv_sql::SqlEngine;
310    /// use llkv_storage::pager::MemPager;
311    ///
312    /// let engine = SqlEngine::new(Arc::new(MemPager::default()));
313    /// let _ = engine
314    ///     .execute(
315    ///         "CREATE TABLE users (id INT PRIMARY KEY, name TEXT);\n         \
316    ///          INSERT INTO users (id, name) VALUES (1, 'Ada');",
317    ///     )
318    ///     .unwrap();
319    ///
320    /// let batches = engine.sql("SELECT id, name FROM users ORDER BY id;").unwrap();
321    /// assert_eq!(batches.len(), 1);
322    ///
323    /// let batch = &batches[0];
324    /// assert_eq!(batch.num_rows(), 1);
325    ///
326    /// let names = batch
327    ///     .column(1)
328    ///     .as_any()
329    ///     .downcast_ref::<StringArray>()
330    ///     .unwrap();
331    /// assert_eq!(names.value(0), "Ada");
332    /// ```
333    pub fn sql(&self, sql: &str) -> SqlResult<Vec<RecordBatch>> {
334        let mut results = self.execute(sql)?;
335        if results.len() != 1 {
336            return Err(Error::InvalidArgumentError(
337                "SqlEngine::sql expects exactly one SQL statement".into(),
338            ));
339        }
340
341        match results.pop().expect("checked length above") {
342            RuntimeStatementResult::Select { execution, .. } => execution.collect(),
343            other => Err(Error::InvalidArgumentError(format!(
344                "SqlEngine::sql requires a SELECT statement, got {other:?}",
345            ))),
346        }
347    }
348
349    fn execute_statement(&self, statement: Statement) -> SqlResult<RuntimeStatementResult<P>> {
350        tracing::trace!(
351            "DEBUG SQL execute_statement: {:?}",
352            match &statement {
353                Statement::Insert(insert) =>
354                    format!("Insert(table={:?})", Self::table_name_from_insert(insert)),
355                Statement::Query(_) => "Query".to_string(),
356                Statement::StartTransaction { .. } => "StartTransaction".to_string(),
357                Statement::Commit { .. } => "Commit".to_string(),
358                Statement::Rollback { .. } => "Rollback".to_string(),
359                Statement::CreateTable(_) => "CreateTable".to_string(),
360                Statement::Update { .. } => "Update".to_string(),
361                Statement::Delete(_) => "Delete".to_string(),
362                other => format!("Other({:?})", other),
363            }
364        );
365        match statement {
366            Statement::StartTransaction {
367                modes,
368                begin,
369                transaction,
370                modifier,
371                statements,
372                exception,
373                has_end_keyword,
374            } => self.handle_start_transaction(
375                modes,
376                begin,
377                transaction,
378                modifier,
379                statements,
380                exception,
381                has_end_keyword,
382            ),
383            Statement::Commit {
384                chain,
385                end,
386                modifier,
387            } => self.handle_commit(chain, end, modifier),
388            Statement::Rollback { chain, savepoint } => self.handle_rollback(chain, savepoint),
389            other => self.execute_statement_non_transactional(other),
390        }
391    }
392
393    fn execute_statement_non_transactional(
394        &self,
395        statement: Statement,
396    ) -> SqlResult<RuntimeStatementResult<P>> {
397        tracing::trace!("DEBUG SQL execute_statement_non_transactional called");
398        match statement {
399            Statement::CreateTable(stmt) => {
400                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateTable");
401                self.handle_create_table(stmt)
402            }
403            Statement::CreateIndex(stmt) => {
404                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateIndex");
405                self.handle_create_index(stmt)
406            }
407            Statement::CreateSchema {
408                schema_name,
409                if_not_exists,
410                with,
411                options,
412                default_collate_spec,
413                clone,
414            } => {
415                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateSchema");
416                self.handle_create_schema(
417                    schema_name,
418                    if_not_exists,
419                    with,
420                    options,
421                    default_collate_spec,
422                    clone,
423                )
424            }
425            Statement::CreateView {
426                name,
427                columns,
428                query,
429                materialized,
430                or_replace,
431                or_alter,
432                options,
433                cluster_by,
434                comment,
435                with_no_schema_binding,
436                if_not_exists,
437                temporary,
438                to,
439                params,
440                secure,
441                name_before_not_exists,
442            } => {
443                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateView");
444                self.handle_create_view(
445                    name,
446                    columns,
447                    query,
448                    materialized,
449                    or_replace,
450                    or_alter,
451                    options,
452                    cluster_by,
453                    comment,
454                    with_no_schema_binding,
455                    if_not_exists,
456                    temporary,
457                    to,
458                    params,
459                    secure,
460                    name_before_not_exists,
461                )
462            }
463            Statement::CreateDomain(create_domain) => {
464                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateDomain");
465                self.handle_create_domain(create_domain)
466            }
467            Statement::DropDomain(drop_domain) => {
468                tracing::trace!("DEBUG SQL execute_statement_non_transactional: DropDomain");
469                self.handle_drop_domain(drop_domain)
470            }
471            Statement::Insert(stmt) => {
472                let table_name =
473                    Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
474                tracing::trace!(
475                    "DEBUG SQL execute_statement_non_transactional: Insert(table={})",
476                    table_name
477                );
478                self.handle_insert(stmt)
479            }
480            Statement::Query(query) => {
481                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Query");
482                self.handle_query(*query)
483            }
484            Statement::Update {
485                table,
486                assignments,
487                from,
488                selection,
489                returning,
490                ..
491            } => {
492                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Update");
493                self.handle_update(table, assignments, from, selection, returning)
494            }
495            Statement::Delete(delete) => {
496                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Delete");
497                self.handle_delete(delete)
498            }
499            Statement::Drop {
500                object_type,
501                if_exists,
502                names,
503                cascade,
504                restrict,
505                purge,
506                temporary,
507                ..
508            } => {
509                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Drop");
510                self.handle_drop(
511                    object_type,
512                    if_exists,
513                    names,
514                    cascade,
515                    restrict,
516                    purge,
517                    temporary,
518                )
519            }
520            Statement::AlterTable {
521                name,
522                if_exists,
523                only,
524                operations,
525                ..
526            } => {
527                tracing::trace!("DEBUG SQL execute_statement_non_transactional: AlterTable");
528                self.handle_alter_table(name, if_exists, only, operations)
529            }
530            Statement::Set(set_stmt) => {
531                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Set");
532                self.handle_set(set_stmt)
533            }
534            Statement::Pragma { name, value, is_eq } => {
535                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Pragma");
536                self.handle_pragma(name, value, is_eq)
537            }
538            other => {
539                tracing::trace!(
540                    "DEBUG SQL execute_statement_non_transactional: Other({:?})",
541                    other
542                );
543                Err(Error::InvalidArgumentError(format!(
544                    "unsupported SQL statement: {other:?}"
545                )))
546            }
547        }
548    }
549
550    fn table_name_from_insert(insert: &sqlparser::ast::Insert) -> SqlResult<String> {
551        match &insert.table {
552            TableObject::TableName(name) => Self::object_name_to_string(name),
553            _ => Err(Error::InvalidArgumentError(
554                "INSERT requires a plain table name".into(),
555            )),
556        }
557    }
558
559    fn table_name_from_update(table: &TableWithJoins) -> SqlResult<Option<String>> {
560        if !table.joins.is_empty() {
561            return Err(Error::InvalidArgumentError(
562                "UPDATE with JOIN targets is not supported yet".into(),
563            ));
564        }
565        Self::table_with_joins_name(table)
566    }
567
568    fn table_name_from_delete(delete: &Delete) -> SqlResult<Option<String>> {
569        if !delete.tables.is_empty() {
570            return Err(Error::InvalidArgumentError(
571                "multi-table DELETE is not supported yet".into(),
572            ));
573        }
574        let from_tables = match &delete.from {
575            FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
576        };
577        if from_tables.is_empty() {
578            return Ok(None);
579        }
580        if from_tables.len() != 1 {
581            return Err(Error::InvalidArgumentError(
582                "DELETE over multiple tables is not supported yet".into(),
583            ));
584        }
585        Self::table_with_joins_name(&from_tables[0])
586    }
587
588    fn object_name_to_string(name: &ObjectName) -> SqlResult<String> {
589        let (display, _) = canonical_object_name(name)?;
590        Ok(display)
591    }
592
593    #[allow(dead_code)]
594    fn table_object_to_name(table: &TableObject) -> SqlResult<Option<String>> {
595        match table {
596            TableObject::TableName(name) => Ok(Some(Self::object_name_to_string(name)?)),
597            TableObject::TableFunction(_) => Ok(None),
598        }
599    }
600
601    fn table_with_joins_name(table: &TableWithJoins) -> SqlResult<Option<String>> {
602        match &table.relation {
603            TableFactor::Table { name, .. } => Ok(Some(Self::object_name_to_string(name)?)),
604            _ => Ok(None),
605        }
606    }
607
608    fn tables_in_query(query: &Query) -> SqlResult<Vec<String>> {
609        let mut tables = Vec::new();
610        if let sqlparser::ast::SetExpr::Select(select) = query.body.as_ref() {
611            for table in &select.from {
612                if let TableFactor::Table { name, .. } = &table.relation {
613                    tables.push(Self::object_name_to_string(name)?);
614                }
615            }
616        }
617        Ok(tables)
618    }
619
620    fn collect_known_columns(
621        &self,
622        display_name: &str,
623        canonical_name: &str,
624    ) -> SqlResult<HashSet<String>> {
625        let context = self.engine.context();
626
627        if context.is_table_marked_dropped(canonical_name) {
628            return Err(Self::table_not_found_error(display_name));
629        }
630
631        // First, check if the table was created in the current transaction
632        if let Some(specs) = self
633            .engine
634            .session()
635            .table_column_specs_from_transaction(canonical_name)
636        {
637            return Ok(specs
638                .into_iter()
639                .map(|spec| spec.name.to_ascii_lowercase())
640                .collect());
641        }
642
643        // Otherwise, look it up in the committed catalog
644        let (_, canonical_name) = llkv_table::canonical_table_name(display_name)
645            .map_err(|e| arrow::error::ArrowError::ExternalError(Box::new(e)))?;
646        match context.catalog().table_column_specs(&canonical_name) {
647            Ok(specs) => Ok(specs
648                .into_iter()
649                .map(|spec| spec.name.to_ascii_lowercase())
650                .collect()),
651            Err(err) => {
652                if !Self::is_table_missing_error(&err) {
653                    return Err(Self::map_table_error(display_name, err));
654                }
655
656                Ok(HashSet::new())
657            }
658        }
659    }
660
661    fn is_table_marked_dropped(&self, table_name: &str) -> SqlResult<bool> {
662        let canonical = table_name.to_ascii_lowercase();
663        Ok(self.engine.context().is_table_marked_dropped(&canonical))
664    }
665
666    fn handle_create_table(
667        &self,
668        mut stmt: sqlparser::ast::CreateTable,
669    ) -> SqlResult<RuntimeStatementResult<P>> {
670        validate_create_table_common(&stmt)?;
671
672        let (mut schema_name, table_name) = parse_schema_qualified_name(&stmt.name)?;
673
674        let namespace = if stmt.temporary {
675            if schema_name.is_some() {
676                return Err(Error::InvalidArgumentError(
677                    "temporary tables cannot specify an explicit schema".into(),
678                ));
679            }
680            schema_name = None;
681            Some(TEMPORARY_NAMESPACE_ID.to_string())
682        } else {
683            None
684        };
685
686        // Validate schema exists if specified
687        if let Some(ref schema) = schema_name {
688            let catalog = self.engine.context().table_catalog();
689            if !catalog.schema_exists(schema) {
690                return Err(Error::CatalogError(format!(
691                    "Schema '{}' does not exist",
692                    schema
693                )));
694            }
695        }
696
697        // Use full qualified name (schema.table or just table)
698        let display_name = match &schema_name {
699            Some(schema) => format!("{}.{}", schema, table_name),
700            None => table_name.clone(),
701        };
702        let canonical_name = display_name.to_ascii_lowercase();
703        tracing::trace!(
704            "\n=== HANDLE_CREATE_TABLE: table='{}' columns={} ===",
705            display_name,
706            stmt.columns.len()
707        );
708        if display_name.is_empty() {
709            return Err(Error::InvalidArgumentError(
710                "table name must not be empty".into(),
711            ));
712        }
713
714        if let Some(query) = stmt.query.take() {
715            validate_create_table_as(&stmt)?;
716            if let Some(result) = self.try_handle_range_ctas(
717                &display_name,
718                &canonical_name,
719                &query,
720                stmt.if_not_exists,
721                stmt.or_replace,
722                namespace.clone(),
723            )? {
724                return Ok(result);
725            }
726            return self.handle_create_table_as(
727                display_name,
728                canonical_name,
729                *query,
730                stmt.if_not_exists,
731                stmt.or_replace,
732                namespace.clone(),
733            );
734        }
735
736        if stmt.columns.is_empty() {
737            return Err(Error::InvalidArgumentError(
738                "CREATE TABLE requires at least one column".into(),
739            ));
740        }
741
742        validate_create_table_definition(&stmt)?;
743
744        let column_defs_ast = std::mem::take(&mut stmt.columns);
745        let constraints = std::mem::take(&mut stmt.constraints);
746
747        let column_names: Vec<String> = column_defs_ast
748            .iter()
749            .map(|column_def| column_def.name.value.clone())
750            .collect();
751        ensure_unique_case_insensitive(column_names.iter().map(|name| name.as_str()), |dup| {
752            format!(
753                "duplicate column name '{}' in table '{}'",
754                dup, display_name
755            )
756        })?;
757        let column_names_lower: HashSet<String> = column_names
758            .iter()
759            .map(|name| name.to_ascii_lowercase())
760            .collect();
761
762        let mut columns: Vec<PlanColumnSpec> = Vec::with_capacity(column_defs_ast.len());
763        let mut primary_key_columns: HashSet<String> = HashSet::new();
764        let mut foreign_keys: Vec<ForeignKeySpec> = Vec::new();
765        let mut multi_column_uniques: Vec<MultiColumnUniqueSpec> = Vec::new();
766
767        // Second pass: process columns including CHECK validation and column-level FKs
768        for column_def in column_defs_ast {
769            let is_nullable = column_def
770                .options
771                .iter()
772                .all(|opt| !matches!(opt.option, ColumnOption::NotNull));
773
774            let is_primary_key = column_def.options.iter().any(|opt| {
775                matches!(
776                    opt.option,
777                    ColumnOption::Unique {
778                        is_primary: true,
779                        characteristics: _
780                    }
781                )
782            });
783
784            let has_unique_constraint = column_def
785                .options
786                .iter()
787                .any(|opt| matches!(opt.option, ColumnOption::Unique { .. }));
788
789            // Extract CHECK constraint if present and validate it
790            let check_expr = column_def.options.iter().find_map(|opt| {
791                if let ColumnOption::Check(expr) = &opt.option {
792                    Some(expr)
793                } else {
794                    None
795                }
796            });
797
798            // Validate CHECK constraint if present (now we have all column names)
799            if let Some(check_expr) = check_expr {
800                let all_col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
801                validate_check_constraint(check_expr, &display_name, &all_col_refs)?;
802            }
803
804            let check_expr_str = check_expr.map(|e| e.to_string());
805
806            // Extract column-level FOREIGN KEY (REFERENCES clause)
807            for opt in &column_def.options {
808                if let ColumnOption::ForeignKey {
809                    foreign_table,
810                    referred_columns,
811                    on_delete,
812                    on_update,
813                    characteristics,
814                } = &opt.option
815                {
816                    let spec = self.build_foreign_key_spec(
817                        &display_name,
818                        &canonical_name,
819                        vec![column_def.name.value.clone()],
820                        foreign_table,
821                        referred_columns,
822                        *on_delete,
823                        *on_update,
824                        characteristics,
825                        &column_names_lower,
826                        None,
827                    )?;
828                    foreign_keys.push(spec);
829                }
830            }
831
832            tracing::trace!(
833                "DEBUG CREATE TABLE column '{}' is_primary_key={} has_unique={} check_expr={:?}",
834                column_def.name.value,
835                is_primary_key,
836                has_unique_constraint,
837                check_expr_str
838            );
839
840            // Resolve custom type aliases to their base types
841            let resolved_data_type = self.engine.context().resolve_type(&column_def.data_type);
842
843            let mut column = PlanColumnSpec::new(
844                column_def.name.value.clone(),
845                arrow_type_from_sql(&resolved_data_type)?,
846                is_nullable,
847            );
848            tracing::trace!(
849                "DEBUG PlanColumnSpec after new(): primary_key={} unique={}",
850                column.primary_key,
851                column.unique
852            );
853
854            column = column
855                .with_primary_key(is_primary_key)
856                .with_unique(has_unique_constraint)
857                .with_check(check_expr_str);
858
859            if is_primary_key {
860                column.nullable = false;
861                primary_key_columns.insert(column.name.to_ascii_lowercase());
862            }
863            tracing::trace!(
864                "DEBUG PlanColumnSpec after with_primary_key({})/with_unique({}): primary_key={} unique={} check_expr={:?}",
865                is_primary_key,
866                has_unique_constraint,
867                column.primary_key,
868                column.unique,
869                column.check_expr
870            );
871
872            columns.push(column);
873        }
874
875        // Apply supported table-level constraints (e.g., PRIMARY KEY)
876        if !constraints.is_empty() {
877            let mut column_lookup: HashMap<String, usize> = HashMap::with_capacity(columns.len());
878            for (idx, column) in columns.iter().enumerate() {
879                column_lookup.insert(column.name.to_ascii_lowercase(), idx);
880            }
881
882            for constraint in constraints {
883                match constraint {
884                    TableConstraint::PrimaryKey {
885                        columns: constraint_columns,
886                        ..
887                    } => {
888                        if !primary_key_columns.is_empty() {
889                            return Err(Error::InvalidArgumentError(
890                                "multiple PRIMARY KEY constraints are not supported".into(),
891                            ));
892                        }
893
894                        ensure_non_empty(&constraint_columns, || {
895                            "PRIMARY KEY requires at least one column".into()
896                        })?;
897
898                        let mut pk_column_names: Vec<String> =
899                            Vec::with_capacity(constraint_columns.len());
900
901                        for index_col in &constraint_columns {
902                            let column_ident = extract_index_column_name(
903                                index_col,
904                                "PRIMARY KEY",
905                                false, // no sort options allowed
906                                false, // only simple identifiers
907                            )?;
908                            pk_column_names.push(column_ident);
909                        }
910
911                        ensure_unique_case_insensitive(
912                            pk_column_names.iter().map(|name| name.as_str()),
913                            |dup| format!("duplicate column '{}' in PRIMARY KEY constraint", dup),
914                        )?;
915
916                        ensure_known_columns_case_insensitive(
917                            pk_column_names.iter().map(|name| name.as_str()),
918                            &column_names_lower,
919                            |unknown| {
920                                format!("unknown column '{}' in PRIMARY KEY constraint", unknown)
921                            },
922                        )?;
923
924                        for column_ident in pk_column_names {
925                            let normalized = column_ident.to_ascii_lowercase();
926                            let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
927                                Error::InvalidArgumentError(format!(
928                                    "unknown column '{}' in PRIMARY KEY constraint",
929                                    column_ident
930                                ))
931                            })?;
932
933                            let column = columns.get_mut(idx).expect("column index valid");
934                            column.primary_key = true;
935                            column.unique = true;
936                            column.nullable = false;
937
938                            primary_key_columns.insert(normalized);
939                        }
940                    }
941                    TableConstraint::Unique {
942                        columns: constraint_columns,
943                        index_type,
944                        index_options,
945                        characteristics,
946                        nulls_distinct,
947                        name,
948                        ..
949                    } => {
950                        if !matches!(nulls_distinct, NullsDistinctOption::None) {
951                            return Err(Error::InvalidArgumentError(
952                                "UNIQUE constraints with NULLS DISTINCT/NOT DISTINCT are not supported yet".into(),
953                            ));
954                        }
955
956                        if index_type.is_some() {
957                            return Err(Error::InvalidArgumentError(
958                                "UNIQUE constraints with index types are not supported yet".into(),
959                            ));
960                        }
961
962                        if !index_options.is_empty() {
963                            return Err(Error::InvalidArgumentError(
964                                "UNIQUE constraints with index options are not supported yet"
965                                    .into(),
966                            ));
967                        }
968
969                        if characteristics.is_some() {
970                            return Err(Error::InvalidArgumentError(
971                                "UNIQUE constraint characteristics are not supported yet".into(),
972                            ));
973                        }
974
975                        ensure_non_empty(&constraint_columns, || {
976                            "UNIQUE constraint requires at least one column".into()
977                        })?;
978
979                        let mut unique_column_names: Vec<String> =
980                            Vec::with_capacity(constraint_columns.len());
981
982                        for index_column in &constraint_columns {
983                            let column_ident = extract_index_column_name(
984                                index_column,
985                                "UNIQUE constraint",
986                                false, // no sort options allowed
987                                false, // only simple identifiers
988                            )?;
989                            unique_column_names.push(column_ident);
990                        }
991
992                        ensure_unique_case_insensitive(
993                            unique_column_names.iter().map(|name| name.as_str()),
994                            |dup| format!("duplicate column '{}' in UNIQUE constraint", dup),
995                        )?;
996
997                        ensure_known_columns_case_insensitive(
998                            unique_column_names.iter().map(|name| name.as_str()),
999                            &column_names_lower,
1000                            |unknown| format!("unknown column '{}' in UNIQUE constraint", unknown),
1001                        )?;
1002
1003                        if unique_column_names.len() > 1 {
1004                            // Multi-column UNIQUE constraint
1005                            multi_column_uniques.push(MultiColumnUniqueSpec {
1006                                name: name.map(|n| n.value),
1007                                columns: unique_column_names,
1008                            });
1009                        } else {
1010                            // Single-column UNIQUE constraint
1011                            let column_ident = unique_column_names
1012                                .into_iter()
1013                                .next()
1014                                .expect("unique constraint checked for emptiness");
1015                            let normalized = column_ident.to_ascii_lowercase();
1016                            let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
1017                                Error::InvalidArgumentError(format!(
1018                                    "unknown column '{}' in UNIQUE constraint",
1019                                    column_ident
1020                                ))
1021                            })?;
1022
1023                            let column = columns
1024                                .get_mut(idx)
1025                                .expect("column index from lookup must be valid");
1026                            column.unique = true;
1027                        }
1028                    }
1029                    TableConstraint::ForeignKey {
1030                        name,
1031                        index_name,
1032                        columns: fk_columns,
1033                        foreign_table,
1034                        referred_columns,
1035                        on_delete,
1036                        on_update,
1037                        characteristics,
1038                        ..
1039                    } => {
1040                        if index_name.is_some() {
1041                            return Err(Error::InvalidArgumentError(
1042                                "FOREIGN KEY index clauses are not supported yet".into(),
1043                            ));
1044                        }
1045
1046                        let referencing_columns: Vec<String> =
1047                            fk_columns.into_iter().map(|ident| ident.value).collect();
1048                        let spec = self.build_foreign_key_spec(
1049                            &display_name,
1050                            &canonical_name,
1051                            referencing_columns,
1052                            &foreign_table,
1053                            &referred_columns,
1054                            on_delete,
1055                            on_update,
1056                            &characteristics,
1057                            &column_names_lower,
1058                            name.map(|ident| ident.value),
1059                        )?;
1060
1061                        foreign_keys.push(spec);
1062                    }
1063                    unsupported => {
1064                        return Err(Error::InvalidArgumentError(format!(
1065                            "table-level constraint {:?} is not supported",
1066                            unsupported
1067                        )));
1068                    }
1069                }
1070            }
1071        }
1072
1073        let plan = CreateTablePlan {
1074            name: display_name,
1075            if_not_exists: stmt.if_not_exists,
1076            or_replace: stmt.or_replace,
1077            columns,
1078            source: None,
1079            namespace,
1080            foreign_keys,
1081            multi_column_uniques,
1082        };
1083        self.execute_plan_statement(PlanStatement::CreateTable(plan))
1084    }
1085
1086    fn handle_create_index(
1087        &self,
1088        stmt: sqlparser::ast::CreateIndex,
1089    ) -> SqlResult<RuntimeStatementResult<P>> {
1090        let sqlparser::ast::CreateIndex {
1091            name,
1092            table_name,
1093            using,
1094            columns,
1095            unique,
1096            concurrently,
1097            if_not_exists,
1098            include,
1099            nulls_distinct,
1100            with,
1101            predicate,
1102            index_options,
1103            alter_options,
1104            ..
1105        } = stmt;
1106
1107        if concurrently {
1108            return Err(Error::InvalidArgumentError(
1109                "CREATE INDEX CONCURRENTLY is not supported".into(),
1110            ));
1111        }
1112        if using.is_some() {
1113            return Err(Error::InvalidArgumentError(
1114                "CREATE INDEX USING clauses are not supported".into(),
1115            ));
1116        }
1117        if !include.is_empty() {
1118            return Err(Error::InvalidArgumentError(
1119                "CREATE INDEX INCLUDE columns are not supported".into(),
1120            ));
1121        }
1122        if nulls_distinct.is_some() {
1123            return Err(Error::InvalidArgumentError(
1124                "CREATE INDEX NULLS DISTINCT is not supported".into(),
1125            ));
1126        }
1127        if !with.is_empty() {
1128            return Err(Error::InvalidArgumentError(
1129                "CREATE INDEX WITH options are not supported".into(),
1130            ));
1131        }
1132        if predicate.is_some() {
1133            return Err(Error::InvalidArgumentError(
1134                "partial CREATE INDEX is not supported".into(),
1135            ));
1136        }
1137        if !index_options.is_empty() {
1138            return Err(Error::InvalidArgumentError(
1139                "CREATE INDEX options are not supported".into(),
1140            ));
1141        }
1142        if !alter_options.is_empty() {
1143            return Err(Error::InvalidArgumentError(
1144                "CREATE INDEX ALTER options are not supported".into(),
1145            ));
1146        }
1147        if columns.is_empty() {
1148            return Err(Error::InvalidArgumentError(
1149                "CREATE INDEX requires at least one column".into(),
1150            ));
1151        }
1152
1153        let (schema_name, base_table_name) = parse_schema_qualified_name(&table_name)?;
1154        if let Some(ref schema) = schema_name {
1155            let catalog = self.engine.context().table_catalog();
1156            if !catalog.schema_exists(schema) {
1157                return Err(Error::CatalogError(format!(
1158                    "Schema '{}' does not exist",
1159                    schema
1160                )));
1161            }
1162        }
1163
1164        let display_table_name = schema_name
1165            .as_ref()
1166            .map(|schema| format!("{}.{}", schema, base_table_name))
1167            .unwrap_or_else(|| base_table_name.clone());
1168        let canonical_table_name = display_table_name.to_ascii_lowercase();
1169
1170        let known_columns =
1171            self.collect_known_columns(&display_table_name, &canonical_table_name)?;
1172        let enforce_known_columns = !known_columns.is_empty();
1173
1174        let index_name = match name {
1175            Some(name_obj) => Some(Self::object_name_to_string(&name_obj)?),
1176            None => None,
1177        };
1178
1179        let mut index_columns: Vec<IndexColumnPlan> = Vec::with_capacity(columns.len());
1180        let mut seen_column_names: HashSet<String> = HashSet::new();
1181        for item in columns {
1182            // Check WITH FILL before calling helper (not part of standard validation)
1183            if item.column.with_fill.is_some() {
1184                return Err(Error::InvalidArgumentError(
1185                    "CREATE INDEX column WITH FILL is not supported".into(),
1186                ));
1187            }
1188
1189            let column_name = extract_index_column_name(
1190                &item,
1191                "CREATE INDEX",
1192                true, // allow and validate sort options
1193                true, // allow compound identifiers
1194            )?;
1195
1196            // Get sort options (already validated by helper)
1197            let order_expr = &item.column;
1198            let ascending = order_expr.options.asc.unwrap_or(true);
1199            let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
1200
1201            let normalized = column_name.to_ascii_lowercase();
1202            if !seen_column_names.insert(normalized.clone()) {
1203                return Err(Error::InvalidArgumentError(format!(
1204                    "duplicate column '{}' in CREATE INDEX",
1205                    column_name
1206                )));
1207            }
1208
1209            if enforce_known_columns && !known_columns.contains(&normalized) {
1210                return Err(Error::InvalidArgumentError(format!(
1211                    "column '{}' does not exist in table '{}'",
1212                    column_name, display_table_name
1213                )));
1214            }
1215
1216            let column_plan = IndexColumnPlan::new(column_name).with_sort(ascending, nulls_first);
1217            index_columns.push(column_plan);
1218        }
1219
1220        if index_columns.len() > 1 && !unique {
1221            return Err(Error::InvalidArgumentError(
1222                "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
1223            ));
1224        }
1225
1226        let plan = CreateIndexPlan::new(display_table_name)
1227            .with_name(index_name)
1228            .with_unique(unique)
1229            .with_if_not_exists(if_not_exists)
1230            .with_columns(index_columns);
1231
1232        self.execute_plan_statement(PlanStatement::CreateIndex(plan))
1233    }
1234
1235    fn map_referential_action(
1236        action: Option<ReferentialAction>,
1237        kind: &str,
1238    ) -> SqlResult<ForeignKeyAction> {
1239        match action {
1240            None | Some(ReferentialAction::NoAction) => Ok(ForeignKeyAction::NoAction),
1241            Some(ReferentialAction::Restrict) => Ok(ForeignKeyAction::Restrict),
1242            Some(other) => Err(Error::InvalidArgumentError(format!(
1243                "FOREIGN KEY ON {kind} {:?} is not supported yet",
1244                other
1245            ))),
1246        }
1247    }
1248
1249    #[allow(clippy::too_many_arguments)]
1250    fn build_foreign_key_spec(
1251        &self,
1252        _referencing_display: &str,
1253        referencing_canonical: &str,
1254        referencing_columns: Vec<String>,
1255        foreign_table: &ObjectName,
1256        referenced_columns: &[Ident],
1257        on_delete: Option<ReferentialAction>,
1258        on_update: Option<ReferentialAction>,
1259        characteristics: &Option<ConstraintCharacteristics>,
1260        known_columns_lower: &HashSet<String>,
1261        name: Option<String>,
1262    ) -> SqlResult<ForeignKeySpec> {
1263        if characteristics.is_some() {
1264            return Err(Error::InvalidArgumentError(
1265                "FOREIGN KEY constraint characteristics are not supported yet".into(),
1266            ));
1267        }
1268
1269        ensure_non_empty(&referencing_columns, || {
1270            "FOREIGN KEY constraint requires at least one referencing column".into()
1271        })?;
1272        ensure_unique_case_insensitive(
1273            referencing_columns.iter().map(|name| name.as_str()),
1274            |dup| format!("duplicate column '{}' in FOREIGN KEY constraint", dup),
1275        )?;
1276        ensure_known_columns_case_insensitive(
1277            referencing_columns.iter().map(|name| name.as_str()),
1278            known_columns_lower,
1279            |unknown| format!("unknown column '{}' in FOREIGN KEY constraint", unknown),
1280        )?;
1281
1282        let referenced_columns_vec: Vec<String> = referenced_columns
1283            .iter()
1284            .map(|ident| ident.value.clone())
1285            .collect();
1286        ensure_unique_case_insensitive(
1287            referenced_columns_vec.iter().map(|name| name.as_str()),
1288            |dup| {
1289                format!(
1290                    "duplicate referenced column '{}' in FOREIGN KEY constraint",
1291                    dup
1292                )
1293            },
1294        )?;
1295
1296        if !referenced_columns_vec.is_empty()
1297            && referenced_columns_vec.len() != referencing_columns.len()
1298        {
1299            return Err(Error::InvalidArgumentError(
1300                "FOREIGN KEY referencing and referenced column counts must match".into(),
1301            ));
1302        }
1303
1304        let (referenced_display, referenced_canonical) = canonical_object_name(foreign_table)?;
1305
1306        // Check if the referenced table is a VIEW - VIEWs cannot be referenced by foreign keys
1307        let catalog = self.engine.context().table_catalog();
1308        if let Some(table_id) = catalog.table_id(&referenced_canonical) {
1309            let context = self.engine.context();
1310            if context.is_view(table_id)? {
1311                return Err(Error::CatalogError(format!(
1312                    "Binder Error: cannot reference a VIEW with a FOREIGN KEY: {}",
1313                    referenced_display
1314                )));
1315            }
1316        }
1317
1318        if referenced_canonical == referencing_canonical {
1319            ensure_known_columns_case_insensitive(
1320                referenced_columns_vec.iter().map(|name| name.as_str()),
1321                known_columns_lower,
1322                |unknown| {
1323                    format!(
1324                        "Binder Error: table '{}' does not have a column named '{}'",
1325                        referenced_display, unknown
1326                    )
1327                },
1328            )?;
1329        } else {
1330            let known_columns =
1331                self.collect_known_columns(&referenced_display, &referenced_canonical)?;
1332            if !known_columns.is_empty() {
1333                ensure_known_columns_case_insensitive(
1334                    referenced_columns_vec.iter().map(|name| name.as_str()),
1335                    &known_columns,
1336                    |unknown| {
1337                        format!(
1338                            "Binder Error: table '{}' does not have a column named '{}'",
1339                            referenced_display, unknown
1340                        )
1341                    },
1342                )?;
1343            }
1344        }
1345
1346        let on_delete_action = Self::map_referential_action(on_delete, "DELETE")?;
1347        let on_update_action = Self::map_referential_action(on_update, "UPDATE")?;
1348
1349        Ok(ForeignKeySpec {
1350            name,
1351            columns: referencing_columns,
1352            referenced_table: referenced_display,
1353            referenced_columns: referenced_columns_vec,
1354            on_delete: on_delete_action,
1355            on_update: on_update_action,
1356        })
1357    }
1358
1359    fn handle_create_schema(
1360        &self,
1361        schema_name: SchemaName,
1362        _if_not_exists: bool,
1363        with: Option<Vec<SqlOption>>,
1364        options: Option<Vec<SqlOption>>,
1365        default_collate_spec: Option<SqlExpr>,
1366        clone: Option<ObjectName>,
1367    ) -> SqlResult<RuntimeStatementResult<P>> {
1368        if clone.is_some() {
1369            return Err(Error::InvalidArgumentError(
1370                "CREATE SCHEMA ... CLONE is not supported".into(),
1371            ));
1372        }
1373        if with.as_ref().is_some_and(|opts| !opts.is_empty()) {
1374            return Err(Error::InvalidArgumentError(
1375                "CREATE SCHEMA ... WITH options are not supported".into(),
1376            ));
1377        }
1378        if options.as_ref().is_some_and(|opts| !opts.is_empty()) {
1379            return Err(Error::InvalidArgumentError(
1380                "CREATE SCHEMA options are not supported".into(),
1381            ));
1382        }
1383        if default_collate_spec.is_some() {
1384            return Err(Error::InvalidArgumentError(
1385                "CREATE SCHEMA DEFAULT COLLATE is not supported".into(),
1386            ));
1387        }
1388
1389        let schema_name = match schema_name {
1390            SchemaName::Simple(name) => name,
1391            _ => {
1392                return Err(Error::InvalidArgumentError(
1393                    "CREATE SCHEMA authorization is not supported".into(),
1394                ));
1395            }
1396        };
1397
1398        let (display_name, canonical) = canonical_object_name(&schema_name)?;
1399        if display_name.is_empty() {
1400            return Err(Error::InvalidArgumentError(
1401                "schema name must not be empty".into(),
1402            ));
1403        }
1404
1405        // Register schema in the catalog
1406        let catalog = self.engine.context().table_catalog();
1407
1408        if _if_not_exists && catalog.schema_exists(&canonical) {
1409            return Ok(RuntimeStatementResult::NoOp);
1410        }
1411
1412        catalog.register_schema(&canonical).map_err(|err| {
1413            Error::CatalogError(format!(
1414                "Failed to create schema '{}': {}",
1415                display_name, err
1416            ))
1417        })?;
1418
1419        Ok(RuntimeStatementResult::NoOp)
1420    }
1421
1422    #[allow(clippy::too_many_arguments)]
1423    fn handle_create_view(
1424        &self,
1425        name: ObjectName,
1426        _columns: Vec<sqlparser::ast::ViewColumnDef>,
1427        query: Box<sqlparser::ast::Query>,
1428        materialized: bool,
1429        or_replace: bool,
1430        or_alter: bool,
1431        _options: sqlparser::ast::CreateTableOptions,
1432        _cluster_by: Vec<sqlparser::ast::Ident>,
1433        _comment: Option<String>,
1434        _with_no_schema_binding: bool,
1435        if_not_exists: bool,
1436        temporary: bool,
1437        _to: Option<ObjectName>,
1438        _params: Option<sqlparser::ast::CreateViewParams>,
1439        _secure: bool,
1440        _name_before_not_exists: bool,
1441    ) -> SqlResult<RuntimeStatementResult<P>> {
1442        // Validate unsupported features
1443        if materialized {
1444            return Err(Error::InvalidArgumentError(
1445                "MATERIALIZED VIEWS are not supported".into(),
1446            ));
1447        }
1448        if or_replace {
1449            return Err(Error::InvalidArgumentError(
1450                "CREATE OR REPLACE VIEW is not supported".into(),
1451            ));
1452        }
1453        if or_alter {
1454            return Err(Error::InvalidArgumentError(
1455                "CREATE OR ALTER VIEW is not supported".into(),
1456            ));
1457        }
1458        if temporary {
1459            return Err(Error::InvalidArgumentError(
1460                "TEMPORARY VIEWS are not supported".into(),
1461            ));
1462        }
1463
1464        // Parse view name (same as table parsing)
1465        let (schema_name, view_name) = parse_schema_qualified_name(&name)?;
1466
1467        // Validate schema exists if specified
1468        if let Some(ref schema) = schema_name {
1469            let catalog = self.engine.context().table_catalog();
1470            if !catalog.schema_exists(schema) {
1471                return Err(Error::CatalogError(format!(
1472                    "Schema '{}' does not exist",
1473                    schema
1474                )));
1475            }
1476        }
1477
1478        // Use full qualified name (schema.view or just view)
1479        let display_name = match &schema_name {
1480            Some(schema) => format!("{}.{}", schema, view_name),
1481            None => view_name.clone(),
1482        };
1483        let canonical_name = display_name.to_ascii_lowercase();
1484
1485        // Check if view already exists
1486        let catalog = self.engine.context().table_catalog();
1487        if catalog.table_exists(&canonical_name) {
1488            if if_not_exists {
1489                return Ok(RuntimeStatementResult::NoOp);
1490            }
1491            return Err(Error::CatalogError(format!(
1492                "Table or view '{}' already exists",
1493                display_name
1494            )));
1495        }
1496
1497        // Convert query to SQL string for storage
1498        let view_definition = query.to_string();
1499
1500        // Create the view using the runtime context helper
1501        let context = self.engine.context();
1502        context.create_view(&display_name, view_definition)?;
1503
1504        tracing::debug!("Created view: {}", display_name);
1505        Ok(RuntimeStatementResult::NoOp)
1506    }
1507
1508    fn handle_create_domain(
1509        &self,
1510        create_domain: sqlparser::ast::CreateDomain,
1511    ) -> SqlResult<RuntimeStatementResult<P>> {
1512        use llkv_table::CustomTypeMeta;
1513        use std::time::{SystemTime, UNIX_EPOCH};
1514
1515        // Extract the type alias name
1516        let type_name = create_domain.name.to_string();
1517
1518        // Convert the data type to SQL string for persistence
1519        let base_type_sql = create_domain.data_type.to_string();
1520
1521        // Register the type alias in the runtime context (in-memory)
1522        self.engine
1523            .context()
1524            .register_type(type_name.clone(), create_domain.data_type.clone());
1525
1526        // Persist to catalog
1527        let context = self.engine.context();
1528        let catalog = llkv_table::SysCatalog::new(context.store());
1529
1530        let created_at_micros = SystemTime::now()
1531            .duration_since(UNIX_EPOCH)
1532            .unwrap_or_default()
1533            .as_micros() as u64;
1534
1535        let meta = CustomTypeMeta {
1536            name: type_name.clone(),
1537            base_type_sql,
1538            created_at_micros,
1539        };
1540
1541        catalog.put_custom_type_meta(&meta)?;
1542
1543        tracing::debug!("Created and persisted type alias: {}", type_name);
1544        Ok(RuntimeStatementResult::NoOp)
1545    }
1546
1547    fn handle_drop_domain(
1548        &self,
1549        drop_domain: sqlparser::ast::DropDomain,
1550    ) -> SqlResult<RuntimeStatementResult<P>> {
1551        let if_exists = drop_domain.if_exists;
1552        let type_name = drop_domain.name.to_string();
1553
1554        // Drop the type from the registry (in-memory)
1555        let result = self.engine.context().drop_type(&type_name);
1556
1557        if let Err(err) = result {
1558            if !if_exists {
1559                return Err(err);
1560            }
1561            // if_exists = true, so ignore the error
1562        } else {
1563            // Persist deletion to catalog
1564            let context = self.engine.context();
1565            let catalog = llkv_table::SysCatalog::new(context.store());
1566            catalog.delete_custom_type_meta(&type_name)?;
1567
1568            tracing::debug!("Dropped and removed from catalog type alias: {}", type_name);
1569        }
1570
1571        Ok(RuntimeStatementResult::NoOp)
1572    }
1573
1574    fn try_handle_range_ctas(
1575        &self,
1576        display_name: &str,
1577        _canonical_name: &str,
1578        query: &Query,
1579        if_not_exists: bool,
1580        or_replace: bool,
1581        namespace: Option<String>,
1582    ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1583        let select = match query.body.as_ref() {
1584            SetExpr::Select(select) => select,
1585            _ => return Ok(None),
1586        };
1587        if select.from.len() != 1 {
1588            return Ok(None);
1589        }
1590        let table_with_joins = &select.from[0];
1591        if !table_with_joins.joins.is_empty() {
1592            return Ok(None);
1593        }
1594        let (range_size, range_alias) = match &table_with_joins.relation {
1595            TableFactor::Table {
1596                name,
1597                args: Some(args),
1598                alias,
1599                ..
1600            } => {
1601                let func_name = name.to_string().to_ascii_lowercase();
1602                if func_name != "range" {
1603                    return Ok(None);
1604                }
1605                if args.args.len() != 1 {
1606                    return Err(Error::InvalidArgumentError(
1607                        "range table function expects a single argument".into(),
1608                    ));
1609                }
1610                let size_expr = &args.args[0];
1611                let range_size = match size_expr {
1612                    FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1613                        match &value.value {
1614                            Value::Number(raw, _) => raw.parse::<i64>().map_err(|e| {
1615                                Error::InvalidArgumentError(format!(
1616                                    "invalid range size literal {}: {}",
1617                                    raw, e
1618                                ))
1619                            })?,
1620                            other => {
1621                                return Err(Error::InvalidArgumentError(format!(
1622                                    "unsupported range size value: {:?}",
1623                                    other
1624                                )));
1625                            }
1626                        }
1627                    }
1628                    _ => {
1629                        return Err(Error::InvalidArgumentError(
1630                            "unsupported range argument".into(),
1631                        ));
1632                    }
1633                };
1634                (range_size, alias.as_ref().map(|a| a.name.value.clone()))
1635            }
1636            _ => return Ok(None),
1637        };
1638
1639        if range_size < 0 {
1640            return Err(Error::InvalidArgumentError(
1641                "range size must be non-negative".into(),
1642            ));
1643        }
1644
1645        if select.projection.is_empty() {
1646            return Err(Error::InvalidArgumentError(
1647                "CREATE TABLE AS SELECT requires at least one projected column".into(),
1648            ));
1649        }
1650
1651        let mut column_specs = Vec::with_capacity(select.projection.len());
1652        let mut column_names = Vec::with_capacity(select.projection.len());
1653        let mut row_template = Vec::with_capacity(select.projection.len());
1654        for item in &select.projection {
1655            match item {
1656                SelectItem::ExprWithAlias { expr, alias } => {
1657                    let (value, data_type) = match expr {
1658                        SqlExpr::Value(value_with_span) => match &value_with_span.value {
1659                            Value::Number(raw, _) => {
1660                                let parsed = raw.parse::<i64>().map_err(|e| {
1661                                    Error::InvalidArgumentError(format!(
1662                                        "invalid numeric literal {}: {}",
1663                                        raw, e
1664                                    ))
1665                                })?;
1666                                (
1667                                    PlanValue::Integer(parsed),
1668                                    arrow::datatypes::DataType::Int64,
1669                                )
1670                            }
1671                            Value::SingleQuotedString(s) => (
1672                                PlanValue::String(s.clone()),
1673                                arrow::datatypes::DataType::Utf8,
1674                            ),
1675                            other => {
1676                                return Err(Error::InvalidArgumentError(format!(
1677                                    "unsupported SELECT expression in range CTAS: {:?}",
1678                                    other
1679                                )));
1680                            }
1681                        },
1682                        SqlExpr::Identifier(ident) => {
1683                            let ident_lower = ident.value.to_ascii_lowercase();
1684                            if range_alias
1685                                .as_ref()
1686                                .map(|a| a.eq_ignore_ascii_case(&ident_lower))
1687                                .unwrap_or(false)
1688                                || ident_lower == "range"
1689                            {
1690                                return Err(Error::InvalidArgumentError(
1691                                    "range() table function columns are not supported yet".into(),
1692                                ));
1693                            }
1694                            return Err(Error::InvalidArgumentError(format!(
1695                                "unsupported identifier '{}' in range CTAS projection",
1696                                ident.value
1697                            )));
1698                        }
1699                        other => {
1700                            return Err(Error::InvalidArgumentError(format!(
1701                                "unsupported SELECT expression in range CTAS: {:?}",
1702                                other
1703                            )));
1704                        }
1705                    };
1706                    let column_name = alias.value.clone();
1707                    column_specs.push(PlanColumnSpec::new(column_name.clone(), data_type, true));
1708                    column_names.push(column_name);
1709                    row_template.push(value);
1710                }
1711                other => {
1712                    return Err(Error::InvalidArgumentError(format!(
1713                        "unsupported projection {:?} in range CTAS",
1714                        other
1715                    )));
1716                }
1717            }
1718        }
1719
1720        let plan = CreateTablePlan {
1721            name: display_name.to_string(),
1722            if_not_exists,
1723            or_replace,
1724            columns: column_specs,
1725            source: None,
1726            namespace,
1727            foreign_keys: Vec::new(),
1728            multi_column_uniques: Vec::new(),
1729        };
1730        let create_result = self.execute_plan_statement(PlanStatement::CreateTable(plan))?;
1731
1732        let row_count = range_size
1733            .try_into()
1734            .map_err(|_| Error::InvalidArgumentError("range size exceeds usize".into()))?;
1735        if row_count > 0 {
1736            let rows = vec![row_template; row_count];
1737            let insert_plan = InsertPlan {
1738                table: display_name.to_string(),
1739                columns: column_names,
1740                source: InsertSource::Rows(rows),
1741            };
1742            self.execute_plan_statement(PlanStatement::Insert(insert_plan))?;
1743        }
1744
1745        Ok(Some(create_result))
1746    }
1747
1748    // TODO: Refactor into runtime or executor layer?
1749    // NOTE: PRAGMA handling lives in the SQL layer until a shared runtime hook exists.
1750    /// Try to handle pragma_table_info('table_name') table function
1751    fn try_handle_pragma_table_info(
1752        &self,
1753        query: &Query,
1754    ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1755        let select = match query.body.as_ref() {
1756            SetExpr::Select(select) => select,
1757            _ => return Ok(None),
1758        };
1759
1760        if select.from.len() != 1 {
1761            return Ok(None);
1762        }
1763
1764        let table_with_joins = &select.from[0];
1765        if !table_with_joins.joins.is_empty() {
1766            return Ok(None);
1767        }
1768
1769        // Check if this is pragma_table_info function call
1770        let table_name = match &table_with_joins.relation {
1771            TableFactor::Table {
1772                name,
1773                args: Some(args),
1774                ..
1775            } => {
1776                let func_name = name.to_string().to_ascii_lowercase();
1777                if func_name != "pragma_table_info" {
1778                    return Ok(None);
1779                }
1780
1781                // Extract table name from argument
1782                if args.args.len() != 1 {
1783                    return Err(Error::InvalidArgumentError(
1784                        "pragma_table_info expects exactly one argument".into(),
1785                    ));
1786                }
1787
1788                match &args.args[0] {
1789                    FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1790                        match &value.value {
1791                            Value::SingleQuotedString(s) => s.clone(),
1792                            Value::DoubleQuotedString(s) => s.clone(),
1793                            _ => {
1794                                return Err(Error::InvalidArgumentError(
1795                                    "pragma_table_info argument must be a string".into(),
1796                                ));
1797                            }
1798                        }
1799                    }
1800                    _ => {
1801                        return Err(Error::InvalidArgumentError(
1802                            "pragma_table_info argument must be a string literal".into(),
1803                        ));
1804                    }
1805                }
1806            }
1807            _ => return Ok(None),
1808        };
1809
1810        // Get table column specs from runtime context
1811        let context = self.engine.context();
1812        let (_, canonical_name) = llkv_table::canonical_table_name(&table_name)?;
1813        let columns = context.catalog().table_column_specs(&canonical_name)?;
1814
1815        // Build RecordBatch with table column information
1816        use arrow::array::{BooleanArray, Int32Array, StringArray};
1817        use arrow::datatypes::{DataType, Field, Schema};
1818
1819        let mut cid_values = Vec::new();
1820        let mut name_values = Vec::new();
1821        let mut type_values = Vec::new();
1822        let mut notnull_values = Vec::new();
1823        let mut dflt_value_values: Vec<Option<String>> = Vec::new();
1824        let mut pk_values = Vec::new();
1825
1826        for (idx, col) in columns.iter().enumerate() {
1827            cid_values.push(idx as i32);
1828            name_values.push(col.name.clone());
1829            type_values.push(format!("{:?}", col.data_type)); // Simple type representation
1830            notnull_values.push(!col.nullable);
1831            dflt_value_values.push(None); // We don't track default values yet
1832            pk_values.push(col.primary_key);
1833        }
1834
1835        let schema = Arc::new(Schema::new(vec![
1836            Field::new("cid", DataType::Int32, false),
1837            Field::new("name", DataType::Utf8, false),
1838            Field::new("type", DataType::Utf8, false),
1839            Field::new("notnull", DataType::Boolean, false),
1840            Field::new("dflt_value", DataType::Utf8, true),
1841            Field::new("pk", DataType::Boolean, false),
1842        ]));
1843
1844        use arrow::array::ArrayRef;
1845        let mut batch = RecordBatch::try_new(
1846            Arc::clone(&schema),
1847            vec![
1848                Arc::new(Int32Array::from(cid_values)) as ArrayRef,
1849                Arc::new(StringArray::from(name_values)) as ArrayRef,
1850                Arc::new(StringArray::from(type_values)) as ArrayRef,
1851                Arc::new(BooleanArray::from(notnull_values)) as ArrayRef,
1852                Arc::new(StringArray::from(dflt_value_values)) as ArrayRef,
1853                Arc::new(BooleanArray::from(pk_values)) as ArrayRef,
1854            ],
1855        )
1856        .map_err(|e| Error::Internal(format!("failed to create pragma_table_info batch: {}", e)))?;
1857
1858        // Apply SELECT projections: extract only requested columns
1859        let projection_indices: Vec<usize> = select
1860            .projection
1861            .iter()
1862            .filter_map(|item| {
1863                match item {
1864                    SelectItem::UnnamedExpr(SqlExpr::Identifier(ident)) => {
1865                        schema.index_of(&ident.value).ok()
1866                    }
1867                    SelectItem::ExprWithAlias { expr, .. } => {
1868                        if let SqlExpr::Identifier(ident) = expr {
1869                            schema.index_of(&ident.value).ok()
1870                        } else {
1871                            None
1872                        }
1873                    }
1874                    SelectItem::Wildcard(_) => None, // Handle * separately
1875                    _ => None,
1876                }
1877            })
1878            .collect();
1879
1880        // Apply projections if not SELECT *
1881        let projected_schema;
1882        if !projection_indices.is_empty() {
1883            let projected_fields: Vec<Field> = projection_indices
1884                .iter()
1885                .map(|&idx| schema.field(idx).clone())
1886                .collect();
1887            projected_schema = Arc::new(Schema::new(projected_fields));
1888
1889            let projected_columns: Vec<ArrayRef> = projection_indices
1890                .iter()
1891                .map(|&idx| Arc::clone(batch.column(idx)))
1892                .collect();
1893
1894            batch = RecordBatch::try_new(Arc::clone(&projected_schema), projected_columns)
1895                .map_err(|e| Error::Internal(format!("failed to project columns: {}", e)))?;
1896        } else {
1897            // SELECT * or complex projections - use original schema
1898            projected_schema = schema;
1899        }
1900
1901        // Apply ORDER BY using Arrow compute kernels
1902        if let Some(order_by) = &query.order_by {
1903            use arrow::compute::SortColumn;
1904            use arrow::compute::lexsort_to_indices;
1905            use sqlparser::ast::OrderByKind;
1906
1907            let exprs = match &order_by.kind {
1908                OrderByKind::Expressions(exprs) => exprs,
1909                _ => {
1910                    return Err(Error::InvalidArgumentError(
1911                        "unsupported ORDER BY clause".into(),
1912                    ));
1913                }
1914            };
1915
1916            let mut sort_columns = Vec::new();
1917            for order_expr in exprs {
1918                if let SqlExpr::Identifier(ident) = &order_expr.expr
1919                    && let Ok(col_idx) = projected_schema.index_of(&ident.value)
1920                {
1921                    let options = arrow::compute::SortOptions {
1922                        descending: !order_expr.options.asc.unwrap_or(true),
1923                        nulls_first: order_expr.options.nulls_first.unwrap_or(false),
1924                    };
1925                    sort_columns.push(SortColumn {
1926                        values: Arc::clone(batch.column(col_idx)),
1927                        options: Some(options),
1928                    });
1929                }
1930            }
1931
1932            if !sort_columns.is_empty() {
1933                let indices = lexsort_to_indices(&sort_columns, None)
1934                    .map_err(|e| Error::Internal(format!("failed to sort: {}", e)))?;
1935
1936                use arrow::compute::take;
1937                let sorted_columns: Result<Vec<ArrayRef>, _> = batch
1938                    .columns()
1939                    .iter()
1940                    .map(|col| take(col.as_ref(), &indices, None))
1941                    .collect();
1942
1943                batch = RecordBatch::try_new(
1944                    Arc::clone(&projected_schema),
1945                    sorted_columns
1946                        .map_err(|e| Error::Internal(format!("failed to apply sort: {}", e)))?,
1947                )
1948                .map_err(|e| Error::Internal(format!("failed to create sorted batch: {}", e)))?;
1949            }
1950        }
1951
1952        let execution = SelectExecution::new_single_batch(
1953            table_name.clone(),
1954            Arc::clone(&projected_schema),
1955            batch,
1956        );
1957
1958        Ok(Some(RuntimeStatementResult::Select {
1959            table_name,
1960            schema: projected_schema,
1961            execution: Box::new(execution),
1962        }))
1963    }
1964
1965    fn handle_create_table_as(
1966        &self,
1967        display_name: String,
1968        _canonical_name: String,
1969        query: Query,
1970        if_not_exists: bool,
1971        or_replace: bool,
1972        namespace: Option<String>,
1973    ) -> SqlResult<RuntimeStatementResult<P>> {
1974        // Check if this is a SELECT from VALUES in a derived table
1975        // Pattern: SELECT * FROM (VALUES ...) alias(col1, col2, ...)
1976        if let SetExpr::Select(select) = query.body.as_ref()
1977            && let Some((rows, column_names)) = extract_values_from_derived_table(&select.from)?
1978        {
1979            // Convert VALUES rows to Arrow RecordBatches
1980            return self.handle_create_table_from_values(
1981                display_name,
1982                rows,
1983                column_names,
1984                if_not_exists,
1985                or_replace,
1986                namespace,
1987            );
1988        }
1989
1990        // Regular CTAS with SELECT from existing tables
1991        let select_plan = self.build_select_plan(query)?;
1992
1993        if select_plan.projections.is_empty() && select_plan.aggregates.is_empty() {
1994            return Err(Error::InvalidArgumentError(
1995                "CREATE TABLE AS SELECT requires at least one projected column".into(),
1996            ));
1997        }
1998
1999        let plan = CreateTablePlan {
2000            name: display_name,
2001            if_not_exists,
2002            or_replace,
2003            columns: Vec::new(),
2004            source: Some(CreateTableSource::Select {
2005                plan: Box::new(select_plan),
2006            }),
2007            namespace,
2008            foreign_keys: Vec::new(),
2009            multi_column_uniques: Vec::new(),
2010        };
2011        self.execute_plan_statement(PlanStatement::CreateTable(plan))
2012    }
2013
2014    fn handle_create_table_from_values(
2015        &self,
2016        display_name: String,
2017        rows: Vec<Vec<PlanValue>>,
2018        column_names: Vec<String>,
2019        if_not_exists: bool,
2020        or_replace: bool,
2021        namespace: Option<String>,
2022    ) -> SqlResult<RuntimeStatementResult<P>> {
2023        use arrow::array::{ArrayRef, Float64Builder, Int64Builder, StringBuilder};
2024        use arrow::datatypes::{DataType, Field, Schema};
2025        use arrow::record_batch::RecordBatch;
2026        use std::sync::Arc;
2027
2028        if rows.is_empty() {
2029            return Err(Error::InvalidArgumentError(
2030                "VALUES must have at least one row".into(),
2031            ));
2032        }
2033
2034        let num_cols = column_names.len();
2035
2036        // Infer schema from first row
2037        let first_row = &rows[0];
2038        if first_row.len() != num_cols {
2039            return Err(Error::InvalidArgumentError(
2040                "VALUES row column count mismatch".into(),
2041            ));
2042        }
2043
2044        let mut fields = Vec::with_capacity(num_cols);
2045        let mut column_types = Vec::with_capacity(num_cols);
2046
2047        for (idx, value) in first_row.iter().enumerate() {
2048            let (data_type, nullable) = match value {
2049                PlanValue::Integer(_) => (DataType::Int64, false),
2050                PlanValue::Float(_) => (DataType::Float64, false),
2051                PlanValue::String(_) => (DataType::Utf8, false),
2052                PlanValue::Null => (DataType::Utf8, true), // Default NULL to string type
2053                _ => {
2054                    return Err(Error::InvalidArgumentError(format!(
2055                        "unsupported value type in VALUES for column '{}'",
2056                        column_names.get(idx).unwrap_or(&format!("column{}", idx))
2057                    )));
2058                }
2059            };
2060
2061            column_types.push(data_type.clone());
2062            fields.push(Field::new(&column_names[idx], data_type, nullable));
2063        }
2064
2065        let schema = Arc::new(Schema::new(fields));
2066
2067        // Build Arrow arrays for each column
2068        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(num_cols);
2069
2070        for col_idx in 0..num_cols {
2071            let col_type = &column_types[col_idx];
2072
2073            match col_type {
2074                DataType::Int64 => {
2075                    let mut builder = Int64Builder::with_capacity(rows.len());
2076                    for row in &rows {
2077                        match &row[col_idx] {
2078                            PlanValue::Integer(v) => builder.append_value(*v),
2079                            PlanValue::Null => builder.append_null(),
2080                            other => {
2081                                return Err(Error::InvalidArgumentError(format!(
2082                                    "type mismatch in VALUES: expected Integer, got {:?}",
2083                                    other
2084                                )));
2085                            }
2086                        }
2087                    }
2088                    arrays.push(Arc::new(builder.finish()) as ArrayRef);
2089                }
2090                DataType::Float64 => {
2091                    let mut builder = Float64Builder::with_capacity(rows.len());
2092                    for row in &rows {
2093                        match &row[col_idx] {
2094                            PlanValue::Float(v) => builder.append_value(*v),
2095                            PlanValue::Null => builder.append_null(),
2096                            other => {
2097                                return Err(Error::InvalidArgumentError(format!(
2098                                    "type mismatch in VALUES: expected Float, got {:?}",
2099                                    other
2100                                )));
2101                            }
2102                        }
2103                    }
2104                    arrays.push(Arc::new(builder.finish()) as ArrayRef);
2105                }
2106                DataType::Utf8 => {
2107                    let mut builder = StringBuilder::with_capacity(rows.len(), 1024);
2108                    for row in &rows {
2109                        match &row[col_idx] {
2110                            PlanValue::String(v) => builder.append_value(v),
2111                            PlanValue::Null => builder.append_null(),
2112                            other => {
2113                                return Err(Error::InvalidArgumentError(format!(
2114                                    "type mismatch in VALUES: expected String, got {:?}",
2115                                    other
2116                                )));
2117                            }
2118                        }
2119                    }
2120                    arrays.push(Arc::new(builder.finish()) as ArrayRef);
2121                }
2122                other => {
2123                    return Err(Error::InvalidArgumentError(format!(
2124                        "unsupported column type in VALUES: {:?}",
2125                        other
2126                    )));
2127                }
2128            }
2129        }
2130
2131        let batch = RecordBatch::try_new(Arc::clone(&schema), arrays).map_err(|e| {
2132            Error::Internal(format!("failed to create RecordBatch from VALUES: {}", e))
2133        })?;
2134
2135        let plan = CreateTablePlan {
2136            name: display_name.clone(),
2137            if_not_exists,
2138            or_replace,
2139            columns: Vec::new(),
2140            source: Some(CreateTableSource::Batches {
2141                schema: Arc::clone(&schema),
2142                batches: vec![batch],
2143            }),
2144            namespace,
2145            foreign_keys: Vec::new(),
2146            multi_column_uniques: Vec::new(),
2147        };
2148
2149        self.execute_plan_statement(PlanStatement::CreateTable(plan))
2150    }
2151
2152    fn handle_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<RuntimeStatementResult<P>> {
2153        let table_name_debug =
2154            Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
2155        tracing::trace!(
2156            "DEBUG SQL handle_insert called for table={}",
2157            table_name_debug
2158        );
2159        if !self.engine.session().has_active_transaction()
2160            && self.is_table_marked_dropped(&table_name_debug)?
2161        {
2162            return Err(Error::TransactionContextError(
2163                DROPPED_TABLE_TRANSACTION_ERR.into(),
2164            ));
2165        }
2166        if stmt.replace_into || stmt.ignore || stmt.or.is_some() {
2167            return Err(Error::InvalidArgumentError(
2168                "non-standard INSERT forms are not supported".into(),
2169            ));
2170        }
2171        if stmt.overwrite {
2172            return Err(Error::InvalidArgumentError(
2173                "INSERT OVERWRITE is not supported".into(),
2174            ));
2175        }
2176        if !stmt.assignments.is_empty() {
2177            return Err(Error::InvalidArgumentError(
2178                "INSERT ... SET is not supported".into(),
2179            ));
2180        }
2181        if stmt.partitioned.is_some() || !stmt.after_columns.is_empty() {
2182            return Err(Error::InvalidArgumentError(
2183                "partitioned INSERT is not supported".into(),
2184            ));
2185        }
2186        if stmt.returning.is_some() {
2187            return Err(Error::InvalidArgumentError(
2188                "INSERT ... RETURNING is not supported".into(),
2189            ));
2190        }
2191        if stmt.format_clause.is_some() || stmt.settings.is_some() {
2192            return Err(Error::InvalidArgumentError(
2193                "INSERT with FORMAT or SETTINGS is not supported".into(),
2194            ));
2195        }
2196
2197        let (display_name, _canonical_name) = match &stmt.table {
2198            TableObject::TableName(name) => canonical_object_name(name)?,
2199            _ => {
2200                return Err(Error::InvalidArgumentError(
2201                    "INSERT requires a plain table name".into(),
2202                ));
2203            }
2204        };
2205
2206        let columns: Vec<String> = stmt
2207            .columns
2208            .iter()
2209            .map(|ident| ident.value.clone())
2210            .collect();
2211        let source_expr = stmt
2212            .source
2213            .as_ref()
2214            .ok_or_else(|| Error::InvalidArgumentError("INSERT requires a VALUES clause".into()))?;
2215        validate_simple_query(source_expr)?;
2216
2217        let insert_source = match source_expr.body.as_ref() {
2218            SetExpr::Values(values) => {
2219                if values.rows.is_empty() {
2220                    return Err(Error::InvalidArgumentError(
2221                        "INSERT VALUES list must contain at least one row".into(),
2222                    ));
2223                }
2224                let mut rows: Vec<Vec<SqlValue>> = Vec::with_capacity(values.rows.len());
2225                for row in &values.rows {
2226                    let mut converted = Vec::with_capacity(row.len());
2227                    for expr in row {
2228                        converted.push(SqlValue::try_from_expr(expr)?);
2229                    }
2230                    rows.push(converted);
2231                }
2232                InsertSource::Rows(
2233                    rows.into_iter()
2234                        .map(|row| row.into_iter().map(PlanValue::from).collect())
2235                        .collect(),
2236                )
2237            }
2238            SetExpr::Select(select) => {
2239                if let Some(rows) = extract_constant_select_rows(select.as_ref())? {
2240                    InsertSource::Rows(rows)
2241                } else if let Some(range_rows) = extract_rows_from_range(select.as_ref())? {
2242                    InsertSource::Rows(range_rows.into_rows())
2243                } else {
2244                    let select_plan = self.build_select_plan((**source_expr).clone())?;
2245                    InsertSource::Select {
2246                        plan: Box::new(select_plan),
2247                    }
2248                }
2249            }
2250            _ => {
2251                return Err(Error::InvalidArgumentError(
2252                    "unsupported INSERT source".into(),
2253                ));
2254            }
2255        };
2256
2257        let plan = InsertPlan {
2258            table: display_name.clone(),
2259            columns,
2260            source: insert_source,
2261        };
2262        tracing::trace!(
2263            "DEBUG SQL handle_insert: about to execute insert for table={}",
2264            display_name
2265        );
2266        self.execute_plan_statement(PlanStatement::Insert(plan))
2267    }
2268
2269    fn handle_update(
2270        &self,
2271        table: TableWithJoins,
2272        assignments: Vec<Assignment>,
2273        from: Option<UpdateTableFromKind>,
2274        selection: Option<SqlExpr>,
2275        returning: Option<Vec<SelectItem>>,
2276    ) -> SqlResult<RuntimeStatementResult<P>> {
2277        if from.is_some() {
2278            return Err(Error::InvalidArgumentError(
2279                "UPDATE ... FROM is not supported yet".into(),
2280            ));
2281        }
2282        if returning.is_some() {
2283            return Err(Error::InvalidArgumentError(
2284                "UPDATE ... RETURNING is not supported".into(),
2285            ));
2286        }
2287        if assignments.is_empty() {
2288            return Err(Error::InvalidArgumentError(
2289                "UPDATE requires at least one assignment".into(),
2290            ));
2291        }
2292
2293        let (display_name, canonical_name) = extract_single_table(std::slice::from_ref(&table))?;
2294
2295        if !self.engine.session().has_active_transaction()
2296            && self
2297                .engine
2298                .context()
2299                .is_table_marked_dropped(&canonical_name)
2300        {
2301            return Err(Error::TransactionContextError(
2302                DROPPED_TABLE_TRANSACTION_ERR.into(),
2303            ));
2304        }
2305
2306        let catalog = self.engine.context().table_catalog();
2307        let resolver = catalog.identifier_resolver();
2308        let table_id = catalog.table_id(&canonical_name);
2309
2310        let mut column_assignments = Vec::with_capacity(assignments.len());
2311        let mut seen: HashMap<String, ()> = HashMap::new();
2312        for assignment in assignments {
2313            let column_name = resolve_assignment_column_name(&assignment.target)?;
2314            let normalized = column_name.to_ascii_lowercase();
2315            if seen.insert(normalized, ()).is_some() {
2316                return Err(Error::InvalidArgumentError(format!(
2317                    "duplicate column '{}' in UPDATE assignments",
2318                    column_name
2319                )));
2320            }
2321            let value = match SqlValue::try_from_expr(&assignment.value) {
2322                Ok(literal) => AssignmentValue::Literal(PlanValue::from(literal)),
2323                Err(Error::InvalidArgumentError(msg))
2324                    if msg.contains("unsupported literal expression") =>
2325                {
2326                    let translated = translate_scalar_with_context(
2327                        &resolver,
2328                        IdentifierContext::new(table_id),
2329                        &assignment.value,
2330                    )?;
2331                    AssignmentValue::Expression(translated)
2332                }
2333                Err(err) => return Err(err),
2334            };
2335            column_assignments.push(ColumnAssignment {
2336                column: column_name,
2337                value,
2338            });
2339        }
2340
2341        let filter = match selection {
2342            Some(expr) => Some(translate_condition_with_context(
2343                &resolver,
2344                IdentifierContext::new(table_id),
2345                &expr,
2346            )?),
2347            None => None,
2348        };
2349
2350        let plan = UpdatePlan {
2351            table: display_name.clone(),
2352            assignments: column_assignments,
2353            filter,
2354        };
2355        self.execute_plan_statement(PlanStatement::Update(plan))
2356    }
2357
2358    #[allow(clippy::collapsible_if)]
2359    fn handle_delete(&self, delete: Delete) -> SqlResult<RuntimeStatementResult<P>> {
2360        let Delete {
2361            tables,
2362            from,
2363            using,
2364            selection,
2365            returning,
2366            order_by,
2367            limit,
2368        } = delete;
2369
2370        if !tables.is_empty() {
2371            return Err(Error::InvalidArgumentError(
2372                "multi-table DELETE is not supported yet".into(),
2373            ));
2374        }
2375        if let Some(using_tables) = using {
2376            if !using_tables.is_empty() {
2377                return Err(Error::InvalidArgumentError(
2378                    "DELETE ... USING is not supported yet".into(),
2379                ));
2380            }
2381        }
2382        if returning.is_some() {
2383            return Err(Error::InvalidArgumentError(
2384                "DELETE ... RETURNING is not supported".into(),
2385            ));
2386        }
2387        if !order_by.is_empty() {
2388            return Err(Error::InvalidArgumentError(
2389                "DELETE ... ORDER BY is not supported yet".into(),
2390            ));
2391        }
2392        if limit.is_some() {
2393            return Err(Error::InvalidArgumentError(
2394                "DELETE ... LIMIT is not supported yet".into(),
2395            ));
2396        }
2397
2398        let from_tables = match from {
2399            FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
2400        };
2401        let (display_name, canonical_name) = extract_single_table(&from_tables)?;
2402
2403        if !self.engine.session().has_active_transaction()
2404            && self
2405                .engine
2406                .context()
2407                .is_table_marked_dropped(&canonical_name)
2408        {
2409            return Err(Error::TransactionContextError(
2410                DROPPED_TABLE_TRANSACTION_ERR.into(),
2411            ));
2412        }
2413
2414        let catalog = self.engine.context().table_catalog();
2415        let resolver = catalog.identifier_resolver();
2416        let table_id = catalog.table_id(&canonical_name);
2417
2418        let filter = selection
2419            .map(|expr| {
2420                translate_condition_with_context(&resolver, IdentifierContext::new(table_id), &expr)
2421            })
2422            .transpose()?;
2423
2424        let plan = DeletePlan {
2425            table: display_name.clone(),
2426            filter,
2427        };
2428        self.execute_plan_statement(PlanStatement::Delete(plan))
2429    }
2430
2431    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors SQL grammar; keep grouped until command builder is introduced.
2432    fn handle_drop(
2433        &self,
2434        object_type: ObjectType,
2435        if_exists: bool,
2436        names: Vec<ObjectName>,
2437        cascade: bool,
2438        restrict: bool,
2439        purge: bool,
2440        temporary: bool,
2441    ) -> SqlResult<RuntimeStatementResult<P>> {
2442        if purge || temporary {
2443            return Err(Error::InvalidArgumentError(
2444                "DROP purge/temporary options are not supported".into(),
2445            ));
2446        }
2447
2448        match object_type {
2449            ObjectType::Table => {
2450                if cascade || restrict {
2451                    return Err(Error::InvalidArgumentError(
2452                        "DROP TABLE CASCADE/RESTRICT is not supported".into(),
2453                    ));
2454                }
2455
2456                for name in names {
2457                    let table_name = Self::object_name_to_string(&name)?;
2458                    let mut plan = llkv_plan::DropTablePlan::new(table_name.clone());
2459                    plan.if_exists = if_exists;
2460
2461                    self.execute_plan_statement(llkv_plan::PlanStatement::DropTable(plan))
2462                        .map_err(|err| Self::map_table_error(&table_name, err))?;
2463                }
2464
2465                Ok(RuntimeStatementResult::NoOp)
2466            }
2467            ObjectType::Index => {
2468                if cascade || restrict {
2469                    return Err(Error::InvalidArgumentError(
2470                        "DROP INDEX CASCADE/RESTRICT is not supported".into(),
2471                    ));
2472                }
2473
2474                for name in names {
2475                    let index_name = Self::object_name_to_string(&name)?;
2476                    let plan = llkv_plan::DropIndexPlan::new(index_name).if_exists(if_exists);
2477                    self.execute_plan_statement(llkv_plan::PlanStatement::DropIndex(plan))?;
2478                }
2479
2480                Ok(RuntimeStatementResult::NoOp)
2481            }
2482            ObjectType::Schema => {
2483                if restrict {
2484                    return Err(Error::InvalidArgumentError(
2485                        "DROP SCHEMA RESTRICT is not supported".into(),
2486                    ));
2487                }
2488
2489                let catalog = self.engine.context().table_catalog();
2490
2491                for name in names {
2492                    let (display_name, canonical_name) = canonical_object_name(&name)?;
2493
2494                    if !catalog.schema_exists(&canonical_name) {
2495                        if if_exists {
2496                            continue;
2497                        }
2498                        return Err(Error::CatalogError(format!(
2499                            "Schema '{}' does not exist",
2500                            display_name
2501                        )));
2502                    }
2503
2504                    if cascade {
2505                        // Drop all tables in this schema
2506                        let all_tables = catalog.table_names();
2507                        let schema_prefix = format!("{}.", canonical_name);
2508
2509                        for table in all_tables {
2510                            if table.to_ascii_lowercase().starts_with(&schema_prefix) {
2511                                let mut plan = llkv_plan::DropTablePlan::new(table.clone());
2512                                plan.if_exists = false;
2513                                self.execute_plan_statement(llkv_plan::PlanStatement::DropTable(
2514                                    plan,
2515                                ))?;
2516                            }
2517                        }
2518                    } else {
2519                        // Check if schema has any tables
2520                        let all_tables = catalog.table_names();
2521                        let schema_prefix = format!("{}.", canonical_name);
2522                        let has_tables = all_tables
2523                            .iter()
2524                            .any(|t| t.to_ascii_lowercase().starts_with(&schema_prefix));
2525
2526                        if has_tables {
2527                            return Err(Error::CatalogError(format!(
2528                                "Schema '{}' is not empty. Use CASCADE to drop schema and all its tables",
2529                                display_name
2530                            )));
2531                        }
2532                    }
2533
2534                    // Drop the schema
2535                    if !catalog.unregister_schema(&canonical_name) && !if_exists {
2536                        return Err(Error::CatalogError(format!(
2537                            "Schema '{}' does not exist",
2538                            display_name
2539                        )));
2540                    }
2541                }
2542
2543                Ok(RuntimeStatementResult::NoOp)
2544            }
2545            _ => Err(Error::InvalidArgumentError(format!(
2546                "DROP {} is not supported",
2547                object_type
2548            ))),
2549        }
2550    }
2551
2552    fn handle_alter_table(
2553        &self,
2554        name: ObjectName,
2555        if_exists: bool,
2556        only: bool,
2557        operations: Vec<AlterTableOperation>,
2558    ) -> SqlResult<RuntimeStatementResult<P>> {
2559        if only {
2560            return Err(Error::InvalidArgumentError(
2561                "ALTER TABLE ONLY is not supported yet".into(),
2562            ));
2563        }
2564
2565        if operations.is_empty() {
2566            return Ok(RuntimeStatementResult::NoOp);
2567        }
2568
2569        if operations.len() != 1 {
2570            return Err(Error::InvalidArgumentError(
2571                "ALTER TABLE currently supports exactly one operation".into(),
2572            ));
2573        }
2574
2575        let operation = operations.into_iter().next().expect("checked length");
2576        match operation {
2577            AlterTableOperation::RenameTable { table_name } => {
2578                let new_name = table_name.to_string();
2579                self.handle_alter_table_rename(name, new_name, if_exists)
2580            }
2581            AlterTableOperation::RenameColumn {
2582                old_column_name,
2583                new_column_name,
2584            } => {
2585                let plan = llkv_plan::AlterTablePlan {
2586                    table_name: name.to_string(),
2587                    if_exists,
2588                    operation: llkv_plan::AlterTableOperation::RenameColumn {
2589                        old_column_name: old_column_name.to_string(),
2590                        new_column_name: new_column_name.to_string(),
2591                    },
2592                };
2593                self.execute_plan_statement(PlanStatement::AlterTable(plan))
2594            }
2595            AlterTableOperation::AlterColumn { column_name, op } => {
2596                // Only support SET DATA TYPE for now
2597                if let AlterColumnOperation::SetDataType {
2598                    data_type,
2599                    using,
2600                    had_set: _,
2601                } = op
2602                {
2603                    if using.is_some() {
2604                        return Err(Error::InvalidArgumentError(
2605                            "ALTER COLUMN SET DATA TYPE USING clause is not yet supported".into(),
2606                        ));
2607                    }
2608
2609                    let plan = llkv_plan::AlterTablePlan {
2610                        table_name: name.to_string(),
2611                        if_exists,
2612                        operation: llkv_plan::AlterTableOperation::SetColumnDataType {
2613                            column_name: column_name.to_string(),
2614                            new_data_type: data_type.to_string(),
2615                        },
2616                    };
2617                    self.execute_plan_statement(PlanStatement::AlterTable(plan))
2618                } else {
2619                    Err(Error::InvalidArgumentError(format!(
2620                        "unsupported ALTER COLUMN operation: {:?}",
2621                        op
2622                    )))
2623                }
2624            }
2625            AlterTableOperation::DropColumn {
2626                has_column_keyword: _,
2627                column_names,
2628                if_exists: column_if_exists,
2629                drop_behavior,
2630            } => {
2631                if column_names.len() != 1 {
2632                    return Err(Error::InvalidArgumentError(
2633                        "DROP COLUMN currently supports dropping one column at a time".into(),
2634                    ));
2635                }
2636
2637                let column_name = column_names.into_iter().next().unwrap().to_string();
2638                let cascade = matches!(drop_behavior, Some(sqlparser::ast::DropBehavior::Cascade));
2639
2640                let plan = llkv_plan::AlterTablePlan {
2641                    table_name: name.to_string(),
2642                    if_exists,
2643                    operation: llkv_plan::AlterTableOperation::DropColumn {
2644                        column_name,
2645                        if_exists: column_if_exists,
2646                        cascade,
2647                    },
2648                };
2649                self.execute_plan_statement(PlanStatement::AlterTable(plan))
2650            }
2651            other => Err(Error::InvalidArgumentError(format!(
2652                "unsupported ALTER TABLE operation: {:?}",
2653                other
2654            ))),
2655        }
2656    }
2657
2658    fn handle_alter_table_rename(
2659        &self,
2660        original_name: ObjectName,
2661        new_table_name: String,
2662        if_exists: bool,
2663    ) -> SqlResult<RuntimeStatementResult<P>> {
2664        let (schema_opt, table_name) = parse_schema_qualified_name(&original_name)?;
2665
2666        let new_table_name_clean = new_table_name.trim();
2667
2668        if new_table_name_clean.is_empty() {
2669            return Err(Error::InvalidArgumentError(
2670                "ALTER TABLE RENAME requires a non-empty table name".into(),
2671            ));
2672        }
2673
2674        let (raw_new_schema_opt, raw_new_table) =
2675            if let Some((schema_part, table_part)) = new_table_name_clean.split_once('.') {
2676                (
2677                    Some(schema_part.trim().to_string()),
2678                    table_part.trim().to_string(),
2679                )
2680            } else {
2681                (None, new_table_name_clean.to_string())
2682            };
2683
2684        if schema_opt.is_none() && raw_new_schema_opt.is_some() {
2685            return Err(Error::InvalidArgumentError(
2686                "ALTER TABLE RENAME cannot add a schema qualifier".into(),
2687            ));
2688        }
2689
2690        let new_table_trimmed = raw_new_table.trim_matches('"');
2691        if new_table_trimmed.is_empty() {
2692            return Err(Error::InvalidArgumentError(
2693                "ALTER TABLE RENAME requires a non-empty table name".into(),
2694            ));
2695        }
2696
2697        if let (Some(existing_schema), Some(new_schema_raw)) =
2698            (schema_opt.as_ref(), raw_new_schema_opt.as_ref())
2699        {
2700            let new_schema_trimmed = new_schema_raw.trim_matches('"');
2701            if !existing_schema.eq_ignore_ascii_case(new_schema_trimmed) {
2702                return Err(Error::InvalidArgumentError(
2703                    "ALTER TABLE RENAME cannot change table schema".into(),
2704                ));
2705            }
2706        }
2707
2708        let new_table_display = raw_new_table;
2709        let new_schema_opt = raw_new_schema_opt;
2710
2711        fn join_schema_table(schema: &str, table: &str) -> String {
2712            let mut qualified = String::with_capacity(schema.len() + table.len() + 1);
2713            qualified.push_str(schema);
2714            qualified.push('.');
2715            qualified.push_str(table);
2716            qualified
2717        }
2718
2719        let current_display = schema_opt
2720            .as_ref()
2721            .map(|schema| join_schema_table(schema, &table_name))
2722            .unwrap_or_else(|| table_name.clone());
2723
2724        let new_display = if let Some(new_schema_raw) = new_schema_opt.clone() {
2725            join_schema_table(&new_schema_raw, &new_table_display)
2726        } else if let Some(schema) = schema_opt.as_ref() {
2727            join_schema_table(schema, &new_table_display)
2728        } else {
2729            new_table_display.clone()
2730        };
2731
2732        let plan = RenameTablePlan::new(&current_display, &new_display).if_exists(if_exists);
2733
2734        match CatalogDdl::rename_table(self.engine.session(), plan) {
2735            Ok(()) => Ok(RuntimeStatementResult::NoOp),
2736            Err(err) => Err(Self::map_table_error(&current_display, err)),
2737        }
2738    }
2739
2740    fn handle_query(&self, query: Query) -> SqlResult<RuntimeStatementResult<P>> {
2741        // Check for pragma_table_info() table function first
2742        if let Some(result) = self.try_handle_pragma_table_info(&query)? {
2743            return Ok(result);
2744        }
2745
2746        let select_plan = self.build_select_plan(query)?;
2747        self.execute_plan_statement(PlanStatement::Select(select_plan))
2748    }
2749
2750    fn build_select_plan(&self, query: Query) -> SqlResult<SelectPlan> {
2751        if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
2752            return Err(Error::TransactionContextError(
2753                "TransactionContext Error: transaction is aborted".into(),
2754            ));
2755        }
2756
2757        validate_simple_query(&query)?;
2758        let catalog = self.engine.context().table_catalog();
2759        let resolver = catalog.identifier_resolver();
2760
2761        let (mut select_plan, select_context) = match query.body.as_ref() {
2762            SetExpr::Select(select) => self.translate_select(select.as_ref(), &resolver)?,
2763            other => {
2764                return Err(Error::InvalidArgumentError(format!(
2765                    "unsupported query expression: {other:?}"
2766                )));
2767            }
2768        };
2769        if let Some(order_by) = &query.order_by {
2770            if !select_plan.aggregates.is_empty() {
2771                return Err(Error::InvalidArgumentError(
2772                    "ORDER BY is not supported for aggregate queries".into(),
2773                ));
2774            }
2775            let order_plan = self.translate_order_by(&resolver, select_context, order_by)?;
2776            select_plan = select_plan.with_order_by(order_plan);
2777        }
2778        Ok(select_plan)
2779    }
2780
2781    fn translate_select(
2782        &self,
2783        select: &Select,
2784        resolver: &IdentifierResolver<'_>,
2785    ) -> SqlResult<(SelectPlan, IdentifierContext)> {
2786        if select.distinct.is_some() {
2787            return Err(Error::InvalidArgumentError(
2788                "SELECT DISTINCT is not supported".into(),
2789            ));
2790        }
2791        if select.top.is_some() {
2792            return Err(Error::InvalidArgumentError(
2793                "SELECT TOP is not supported".into(),
2794            ));
2795        }
2796        if select.exclude.is_some() {
2797            return Err(Error::InvalidArgumentError(
2798                "SELECT EXCLUDE is not supported".into(),
2799            ));
2800        }
2801        if select.into.is_some() {
2802            return Err(Error::InvalidArgumentError(
2803                "SELECT INTO is not supported".into(),
2804            ));
2805        }
2806        if !select.lateral_views.is_empty() {
2807            return Err(Error::InvalidArgumentError(
2808                "LATERAL VIEW is not supported".into(),
2809            ));
2810        }
2811        if select.prewhere.is_some() {
2812            return Err(Error::InvalidArgumentError(
2813                "PREWHERE is not supported".into(),
2814            ));
2815        }
2816        if !group_by_is_empty(&select.group_by) || select.value_table_mode.is_some() {
2817            return Err(Error::InvalidArgumentError(
2818                "GROUP BY and SELECT AS VALUE/STRUCT are not supported".into(),
2819            ));
2820        }
2821        if !select.cluster_by.is_empty()
2822            || !select.distribute_by.is_empty()
2823            || !select.sort_by.is_empty()
2824        {
2825            return Err(Error::InvalidArgumentError(
2826                "CLUSTER/DISTRIBUTE/SORT BY clauses are not supported".into(),
2827            ));
2828        }
2829        if select.having.is_some()
2830            || !select.named_window.is_empty()
2831            || select.qualify.is_some()
2832            || select.connect_by.is_some()
2833        {
2834            return Err(Error::InvalidArgumentError(
2835                "advanced SELECT clauses are not supported".into(),
2836            ));
2837        }
2838
2839        let table_alias = select
2840            .from
2841            .first()
2842            .and_then(|table_with_joins| match &table_with_joins.relation {
2843                TableFactor::Table { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2844                _ => None,
2845            });
2846
2847        if let Some(alias) = table_alias.as_ref() {
2848            validate_projection_alias_qualifiers(&select.projection, alias)?;
2849        }
2850        // Handle different FROM clause scenarios
2851        let catalog = self.engine.context().table_catalog();
2852        let (mut plan, id_context) = if select.from.is_empty() {
2853            // No FROM clause - use empty string for table context (e.g., SELECT 42, SELECT {'a': 1} AS x)
2854            let mut p = SelectPlan::new("");
2855            let projections = self.build_projection_list(
2856                resolver,
2857                IdentifierContext::new(None),
2858                &select.projection,
2859            )?;
2860            p = p.with_projections(projections);
2861            (p, IdentifierContext::new(None))
2862        } else if select.from.len() == 1 {
2863            // Single table query
2864            let (display_name, canonical_name) = extract_single_table(&select.from)?;
2865            let table_id = catalog.table_id(&canonical_name);
2866            let mut p = SelectPlan::new(display_name.clone());
2867            if let Some(aggregates) = self.detect_simple_aggregates(&select.projection)? {
2868                p = p.with_aggregates(aggregates);
2869            } else {
2870                let projections = self.build_projection_list(
2871                    resolver,
2872                    IdentifierContext::new(table_id),
2873                    &select.projection,
2874                )?;
2875                p = p.with_projections(projections);
2876            }
2877            (p, IdentifierContext::new(table_id))
2878        } else {
2879            // Multiple tables - cross product
2880            let tables = extract_tables(&select.from)?;
2881            let mut p = SelectPlan::with_tables(tables);
2882            // For multi-table queries, we'll build projections differently
2883            // For now, just handle simple column references
2884            let projections = self.build_projection_list(
2885                resolver,
2886                IdentifierContext::new(None),
2887                &select.projection,
2888            )?;
2889            p = p.with_projections(projections);
2890            (p, IdentifierContext::new(None))
2891        };
2892
2893        let filter_expr = match &select.selection {
2894            Some(expr) => Some(translate_condition_with_context(
2895                resolver, id_context, expr,
2896            )?),
2897            None => None,
2898        };
2899        plan = plan.with_filter(filter_expr);
2900        Ok((plan, id_context))
2901    }
2902
2903    fn translate_order_by(
2904        &self,
2905        resolver: &IdentifierResolver<'_>,
2906        id_context: IdentifierContext,
2907        order_by: &OrderBy,
2908    ) -> SqlResult<Vec<OrderByPlan>> {
2909        let exprs = match &order_by.kind {
2910            OrderByKind::Expressions(exprs) => exprs,
2911            _ => {
2912                return Err(Error::InvalidArgumentError(
2913                    "unsupported ORDER BY clause".into(),
2914                ));
2915            }
2916        };
2917
2918        let base_nulls_first = self.default_nulls_first.load(AtomicOrdering::Relaxed);
2919
2920        let resolve_simple_column = |expr: &SqlExpr| -> SqlResult<String> {
2921            let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2922            match scalar {
2923                llkv_expr::expr::ScalarExpr::Column(column) => Ok(column),
2924                other => Err(Error::InvalidArgumentError(format!(
2925                    "ORDER BY expression must reference a simple column, found {other:?}"
2926                ))),
2927            }
2928        };
2929
2930        let mut plans = Vec::with_capacity(exprs.len());
2931        for order_expr in exprs {
2932            let ascending = order_expr.options.asc.unwrap_or(true);
2933            let default_nulls_first_for_direction = if ascending {
2934                base_nulls_first
2935            } else {
2936                !base_nulls_first
2937            };
2938            let nulls_first = order_expr
2939                .options
2940                .nulls_first
2941                .unwrap_or(default_nulls_first_for_direction);
2942
2943            if let SqlExpr::Identifier(ident) = &order_expr.expr
2944                && ident.value.eq_ignore_ascii_case("ALL")
2945                && ident.quote_style.is_none()
2946            {
2947                plans.push(OrderByPlan {
2948                    target: OrderTarget::All,
2949                    sort_type: OrderSortType::Native,
2950                    ascending,
2951                    nulls_first,
2952                });
2953                continue;
2954            }
2955
2956            let (target, sort_type) = match &order_expr.expr {
2957                SqlExpr::Identifier(_) | SqlExpr::CompoundIdentifier(_) => (
2958                    OrderTarget::Column(resolve_simple_column(&order_expr.expr)?),
2959                    OrderSortType::Native,
2960                ),
2961                SqlExpr::Cast {
2962                    expr,
2963                    data_type:
2964                        SqlDataType::Int(_)
2965                        | SqlDataType::Integer(_)
2966                        | SqlDataType::BigInt(_)
2967                        | SqlDataType::SmallInt(_)
2968                        | SqlDataType::TinyInt(_),
2969                    ..
2970                } => (
2971                    OrderTarget::Column(resolve_simple_column(expr)?),
2972                    OrderSortType::CastTextToInteger,
2973                ),
2974                SqlExpr::Cast { data_type, .. } => {
2975                    return Err(Error::InvalidArgumentError(format!(
2976                        "ORDER BY CAST target type {:?} is not supported",
2977                        data_type
2978                    )));
2979                }
2980                SqlExpr::Value(value_with_span) => match &value_with_span.value {
2981                    Value::Number(raw, _) => {
2982                        let position: usize = raw.parse().map_err(|_| {
2983                            Error::InvalidArgumentError(format!(
2984                                "ORDER BY position '{}' is not a valid positive integer",
2985                                raw
2986                            ))
2987                        })?;
2988                        if position == 0 {
2989                            return Err(Error::InvalidArgumentError(
2990                                "ORDER BY position must be at least 1".into(),
2991                            ));
2992                        }
2993                        (OrderTarget::Index(position - 1), OrderSortType::Native)
2994                    }
2995                    other => {
2996                        return Err(Error::InvalidArgumentError(format!(
2997                            "unsupported ORDER BY literal expression: {other:?}"
2998                        )));
2999                    }
3000                },
3001                other => {
3002                    return Err(Error::InvalidArgumentError(format!(
3003                        "unsupported ORDER BY expression: {other:?}"
3004                    )));
3005                }
3006            };
3007
3008            plans.push(OrderByPlan {
3009                target,
3010                sort_type,
3011                ascending,
3012                nulls_first,
3013            });
3014        }
3015
3016        Ok(plans)
3017    }
3018
3019    fn detect_simple_aggregates(
3020        &self,
3021        projection_items: &[SelectItem],
3022    ) -> SqlResult<Option<Vec<AggregateExpr>>> {
3023        if projection_items.is_empty() {
3024            return Ok(None);
3025        }
3026
3027        let mut specs: Vec<AggregateExpr> = Vec::with_capacity(projection_items.len());
3028        for (idx, item) in projection_items.iter().enumerate() {
3029            let (expr, alias_opt) = match item {
3030                SelectItem::UnnamedExpr(expr) => (expr, None),
3031                SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())),
3032                _ => return Ok(None),
3033            };
3034
3035            let alias = alias_opt.unwrap_or_else(|| format!("col{}", idx + 1));
3036            let SqlExpr::Function(func) = expr else {
3037                return Ok(None);
3038            };
3039
3040            if func.uses_odbc_syntax {
3041                return Err(Error::InvalidArgumentError(
3042                    "ODBC function syntax is not supported in aggregate queries".into(),
3043                ));
3044            }
3045            if !matches!(func.parameters, FunctionArguments::None) {
3046                return Err(Error::InvalidArgumentError(
3047                    "parameterized aggregate functions are not supported".into(),
3048                ));
3049            }
3050            if func.filter.is_some()
3051                || func.null_treatment.is_some()
3052                || func.over.is_some()
3053                || !func.within_group.is_empty()
3054            {
3055                return Err(Error::InvalidArgumentError(
3056                    "advanced aggregate clauses are not supported".into(),
3057                ));
3058            }
3059
3060            let mut is_distinct = false;
3061            let args_slice: &[FunctionArg] = match &func.args {
3062                FunctionArguments::List(list) => {
3063                    if let Some(dup) = &list.duplicate_treatment {
3064                        use sqlparser::ast::DuplicateTreatment;
3065                        match dup {
3066                            DuplicateTreatment::All => {}
3067                            DuplicateTreatment::Distinct => is_distinct = true,
3068                        }
3069                    }
3070                    if !list.clauses.is_empty() {
3071                        return Err(Error::InvalidArgumentError(
3072                            "aggregate argument clauses are not supported".into(),
3073                        ));
3074                    }
3075                    &list.args
3076                }
3077                FunctionArguments::None => &[],
3078                FunctionArguments::Subquery(_) => {
3079                    return Err(Error::InvalidArgumentError(
3080                        "aggregate subquery arguments are not supported".into(),
3081                    ));
3082                }
3083            };
3084
3085            let func_name = if func.name.0.len() == 1 {
3086                match &func.name.0[0] {
3087                    ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
3088                    _ => {
3089                        return Err(Error::InvalidArgumentError(
3090                            "unsupported aggregate function name".into(),
3091                        ));
3092                    }
3093                }
3094            } else {
3095                return Err(Error::InvalidArgumentError(
3096                    "qualified aggregate function names are not supported".into(),
3097                ));
3098            };
3099
3100            let aggregate = match func_name.as_str() {
3101                "count" => {
3102                    if args_slice.len() != 1 {
3103                        return Err(Error::InvalidArgumentError(
3104                            "COUNT accepts exactly one argument".into(),
3105                        ));
3106                    }
3107                    match &args_slice[0] {
3108                        FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
3109                            if is_distinct {
3110                                return Err(Error::InvalidArgumentError(
3111                                    "COUNT(DISTINCT *) is not supported".into(),
3112                                ));
3113                            }
3114                            AggregateExpr::count_star(alias)
3115                        }
3116                        FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
3117                            let column = resolve_column_name(arg_expr)?;
3118                            if is_distinct {
3119                                AggregateExpr::count_distinct_column(column, alias)
3120                            } else {
3121                                AggregateExpr::count_column(column, alias)
3122                            }
3123                        }
3124                        FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
3125                            return Err(Error::InvalidArgumentError(
3126                                "named COUNT arguments are not supported".into(),
3127                            ));
3128                        }
3129                        FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
3130                            return Err(Error::InvalidArgumentError(
3131                                "COUNT does not support qualified wildcards".into(),
3132                            ));
3133                        }
3134                    }
3135                }
3136                "sum" | "min" | "max" => {
3137                    if is_distinct {
3138                        return Err(Error::InvalidArgumentError(
3139                            "DISTINCT is not supported for this aggregate".into(),
3140                        ));
3141                    }
3142                    if args_slice.len() != 1 {
3143                        return Err(Error::InvalidArgumentError(format!(
3144                            "{} accepts exactly one argument",
3145                            func_name.to_uppercase()
3146                        )));
3147                    }
3148                    let arg_expr = match &args_slice[0] {
3149                        FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => arg_expr,
3150                        FunctionArg::Unnamed(FunctionArgExpr::Wildcard)
3151                        | FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
3152                            return Err(Error::InvalidArgumentError(format!(
3153                                "{} does not support wildcard arguments",
3154                                func_name.to_uppercase()
3155                            )));
3156                        }
3157                        FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
3158                            return Err(Error::InvalidArgumentError(format!(
3159                                "{} arguments must be column references",
3160                                func_name.to_uppercase()
3161                            )));
3162                        }
3163                    };
3164
3165                    if func_name == "sum" {
3166                        if let Some(column) = parse_count_nulls_case(arg_expr)? {
3167                            AggregateExpr::count_nulls(column, alias)
3168                        } else {
3169                            let column = resolve_column_name(arg_expr)?;
3170                            AggregateExpr::sum_int64(column, alias)
3171                        }
3172                    } else {
3173                        let column = resolve_column_name(arg_expr)?;
3174                        if func_name == "min" {
3175                            AggregateExpr::min_int64(column, alias)
3176                        } else {
3177                            AggregateExpr::max_int64(column, alias)
3178                        }
3179                    }
3180                }
3181                _ => return Ok(None),
3182            };
3183
3184            specs.push(aggregate);
3185        }
3186
3187        if specs.is_empty() {
3188            return Ok(None);
3189        }
3190        Ok(Some(specs))
3191    }
3192
3193    fn build_projection_list(
3194        &self,
3195        resolver: &IdentifierResolver<'_>,
3196        id_context: IdentifierContext,
3197        projection_items: &[SelectItem],
3198    ) -> SqlResult<Vec<SelectProjection>> {
3199        if projection_items.is_empty() {
3200            return Err(Error::InvalidArgumentError(
3201                "SELECT projection must include at least one column".into(),
3202            ));
3203        }
3204
3205        let mut projections = Vec::with_capacity(projection_items.len());
3206        for (idx, item) in projection_items.iter().enumerate() {
3207            match item {
3208                SelectItem::Wildcard(options) => {
3209                    if let Some(exclude) = &options.opt_exclude {
3210                        use sqlparser::ast::ExcludeSelectItem;
3211                        let exclude_cols = match exclude {
3212                            ExcludeSelectItem::Single(ident) => vec![ident.value.clone()],
3213                            ExcludeSelectItem::Multiple(idents) => {
3214                                idents.iter().map(|id| id.value.clone()).collect()
3215                            }
3216                        };
3217                        projections.push(SelectProjection::AllColumnsExcept {
3218                            exclude: exclude_cols,
3219                        });
3220                    } else {
3221                        projections.push(SelectProjection::AllColumns);
3222                    }
3223                }
3224                SelectItem::QualifiedWildcard(kind, _) => match kind {
3225                    SelectItemQualifiedWildcardKind::ObjectName(name) => {
3226                        projections.push(SelectProjection::Column {
3227                            name: name.to_string(),
3228                            alias: None,
3229                        });
3230                    }
3231                    SelectItemQualifiedWildcardKind::Expr(_) => {
3232                        return Err(Error::InvalidArgumentError(
3233                            "expression-qualified wildcards are not supported".into(),
3234                        ));
3235                    }
3236                },
3237                SelectItem::UnnamedExpr(expr) => match expr {
3238                    SqlExpr::Identifier(ident) => {
3239                        let parts = vec![ident.value.clone()];
3240                        let resolution = resolver.resolve(&parts, id_context)?;
3241                        if resolution.is_simple() {
3242                            projections.push(SelectProjection::Column {
3243                                name: resolution.column().to_string(),
3244                                alias: None,
3245                            });
3246                        } else {
3247                            let alias = format!("col{}", idx + 1);
3248                            projections.push(SelectProjection::Computed {
3249                                expr: resolution.into_scalar_expr(),
3250                                alias,
3251                            });
3252                        }
3253                    }
3254                    SqlExpr::CompoundIdentifier(parts) => {
3255                        let name_parts: Vec<String> =
3256                            parts.iter().map(|part| part.value.clone()).collect();
3257                        let resolution = resolver.resolve(&name_parts, id_context)?;
3258                        if resolution.is_simple() {
3259                            projections.push(SelectProjection::Column {
3260                                name: resolution.column().to_string(),
3261                                alias: None,
3262                            });
3263                        } else {
3264                            let alias = format!("col{}", idx + 1);
3265                            projections.push(SelectProjection::Computed {
3266                                expr: resolution.into_scalar_expr(),
3267                                alias,
3268                            });
3269                        }
3270                    }
3271                    _ => {
3272                        let alias = format!("col{}", idx + 1);
3273                        let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
3274                        projections.push(SelectProjection::Computed {
3275                            expr: scalar,
3276                            alias,
3277                        });
3278                    }
3279                },
3280                SelectItem::ExprWithAlias { expr, alias } => match expr {
3281                    SqlExpr::Identifier(ident) => {
3282                        let parts = vec![ident.value.clone()];
3283                        let resolution = resolver.resolve(&parts, id_context)?;
3284                        if resolution.is_simple() {
3285                            projections.push(SelectProjection::Column {
3286                                name: resolution.column().to_string(),
3287                                alias: Some(alias.value.clone()),
3288                            });
3289                        } else {
3290                            projections.push(SelectProjection::Computed {
3291                                expr: resolution.into_scalar_expr(),
3292                                alias: alias.value.clone(),
3293                            });
3294                        }
3295                    }
3296                    SqlExpr::CompoundIdentifier(parts) => {
3297                        let name_parts: Vec<String> =
3298                            parts.iter().map(|part| part.value.clone()).collect();
3299                        let resolution = resolver.resolve(&name_parts, id_context)?;
3300                        if resolution.is_simple() {
3301                            projections.push(SelectProjection::Column {
3302                                name: resolution.column().to_string(),
3303                                alias: Some(alias.value.clone()),
3304                            });
3305                        } else {
3306                            projections.push(SelectProjection::Computed {
3307                                expr: resolution.into_scalar_expr(),
3308                                alias: alias.value.clone(),
3309                            });
3310                        }
3311                    }
3312                    _ => {
3313                        let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
3314                        projections.push(SelectProjection::Computed {
3315                            expr: scalar,
3316                            alias: alias.value.clone(),
3317                        });
3318                    }
3319                },
3320            }
3321        }
3322        Ok(projections)
3323    }
3324
3325    #[allow(clippy::too_many_arguments)] // NOTE: Keeps parity with SQL START TRANSACTION grammar; revisit when options expand.
3326    fn handle_start_transaction(
3327        &self,
3328        modes: Vec<TransactionMode>,
3329        begin: bool,
3330        transaction: Option<BeginTransactionKind>,
3331        modifier: Option<TransactionModifier>,
3332        statements: Vec<Statement>,
3333        exception: Option<Vec<ExceptionWhen>>,
3334        has_end_keyword: bool,
3335    ) -> SqlResult<RuntimeStatementResult<P>> {
3336        if !modes.is_empty() {
3337            return Err(Error::InvalidArgumentError(
3338                "transaction modes are not supported".into(),
3339            ));
3340        }
3341        if modifier.is_some() {
3342            return Err(Error::InvalidArgumentError(
3343                "transaction modifiers are not supported".into(),
3344            ));
3345        }
3346        if !statements.is_empty() || exception.is_some() || has_end_keyword {
3347            return Err(Error::InvalidArgumentError(
3348                "BEGIN blocks with inline statements or exceptions are not supported".into(),
3349            ));
3350        }
3351        if let Some(kind) = transaction {
3352            match kind {
3353                BeginTransactionKind::Transaction | BeginTransactionKind::Work => {}
3354            }
3355        }
3356        if !begin {
3357            // Currently treat START TRANSACTION same as BEGIN
3358            tracing::warn!("Currently treat `START TRANSACTION` same as `BEGIN`")
3359        }
3360
3361        self.execute_plan_statement(PlanStatement::BeginTransaction)
3362    }
3363
3364    fn handle_commit(
3365        &self,
3366        chain: bool,
3367        end: bool,
3368        modifier: Option<TransactionModifier>,
3369    ) -> SqlResult<RuntimeStatementResult<P>> {
3370        if chain {
3371            return Err(Error::InvalidArgumentError(
3372                "COMMIT AND [NO] CHAIN is not supported".into(),
3373            ));
3374        }
3375        if end {
3376            return Err(Error::InvalidArgumentError(
3377                "END blocks are not supported".into(),
3378            ));
3379        }
3380        if modifier.is_some() {
3381            return Err(Error::InvalidArgumentError(
3382                "transaction modifiers are not supported".into(),
3383            ));
3384        }
3385
3386        self.execute_plan_statement(PlanStatement::CommitTransaction)
3387    }
3388
3389    fn handle_rollback(
3390        &self,
3391        chain: bool,
3392        savepoint: Option<Ident>,
3393    ) -> SqlResult<RuntimeStatementResult<P>> {
3394        if chain {
3395            return Err(Error::InvalidArgumentError(
3396                "ROLLBACK AND [NO] CHAIN is not supported".into(),
3397            ));
3398        }
3399        if savepoint.is_some() {
3400            return Err(Error::InvalidArgumentError(
3401                "ROLLBACK TO SAVEPOINT is not supported".into(),
3402            ));
3403        }
3404
3405        self.execute_plan_statement(PlanStatement::RollbackTransaction)
3406    }
3407
3408    fn handle_set(&self, set_stmt: Set) -> SqlResult<RuntimeStatementResult<P>> {
3409        match set_stmt {
3410            Set::SingleAssignment {
3411                scope,
3412                hivevar,
3413                variable,
3414                values,
3415            } => {
3416                if scope.is_some() || hivevar {
3417                    return Err(Error::InvalidArgumentError(
3418                        "SET modifiers are not supported".into(),
3419                    ));
3420                }
3421
3422                let variable_name_raw = variable.to_string();
3423                let variable_name = variable_name_raw.to_ascii_lowercase();
3424
3425                match variable_name.as_str() {
3426                    "default_null_order" => {
3427                        if values.len() != 1 {
3428                            return Err(Error::InvalidArgumentError(
3429                                "SET default_null_order expects exactly one value".into(),
3430                            ));
3431                        }
3432
3433                        let value_expr = &values[0];
3434                        let normalized = match value_expr {
3435                            SqlExpr::Value(value_with_span) => value_with_span
3436                                .value
3437                                .clone()
3438                                .into_string()
3439                                .map(|s| s.to_ascii_lowercase()),
3440                            SqlExpr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
3441                            _ => None,
3442                        };
3443
3444                        if !matches!(normalized.as_deref(), Some("nulls_first" | "nulls_last")) {
3445                            return Err(Error::InvalidArgumentError(format!(
3446                                "unsupported value for SET default_null_order: {value_expr:?}"
3447                            )));
3448                        }
3449
3450                        let use_nulls_first = matches!(normalized.as_deref(), Some("nulls_first"));
3451                        self.default_nulls_first
3452                            .store(use_nulls_first, AtomicOrdering::Relaxed);
3453
3454                        Ok(RuntimeStatementResult::NoOp)
3455                    }
3456                    "immediate_transaction_mode" => {
3457                        if values.len() != 1 {
3458                            return Err(Error::InvalidArgumentError(
3459                                "SET immediate_transaction_mode expects exactly one value".into(),
3460                            ));
3461                        }
3462                        let normalized = values[0].to_string().to_ascii_lowercase();
3463                        let enabled = match normalized.as_str() {
3464                            "true" | "on" | "1" => true,
3465                            "false" | "off" | "0" => false,
3466                            _ => {
3467                                return Err(Error::InvalidArgumentError(format!(
3468                                    "unsupported value for SET immediate_transaction_mode: {}",
3469                                    values[0]
3470                                )));
3471                            }
3472                        };
3473                        if !enabled {
3474                            tracing::warn!(
3475                                "SET immediate_transaction_mode=false has no effect; continuing with auto mode"
3476                            );
3477                        }
3478                        Ok(RuntimeStatementResult::NoOp)
3479                    }
3480                    _ => Err(Error::InvalidArgumentError(format!(
3481                        "unsupported SET variable: {variable_name_raw}"
3482                    ))),
3483                }
3484            }
3485            other => Err(Error::InvalidArgumentError(format!(
3486                "unsupported SQL SET statement: {other:?}",
3487            ))),
3488        }
3489    }
3490
3491    fn handle_pragma(
3492        &self,
3493        name: ObjectName,
3494        value: Option<Value>,
3495        is_eq: bool,
3496    ) -> SqlResult<RuntimeStatementResult<P>> {
3497        let (display, canonical) = canonical_object_name(&name)?;
3498        if value.is_some() || is_eq {
3499            return Err(Error::InvalidArgumentError(format!(
3500                "PRAGMA '{display}' does not accept a value"
3501            )));
3502        }
3503
3504        match canonical.as_str() {
3505            "enable_verification" | "disable_verification" => Ok(RuntimeStatementResult::NoOp),
3506            _ => Err(Error::InvalidArgumentError(format!(
3507                "unsupported PRAGMA '{}'",
3508                display
3509            ))),
3510        }
3511    }
3512}
3513
3514fn canonical_object_name(name: &ObjectName) -> SqlResult<(String, String)> {
3515    if name.0.is_empty() {
3516        return Err(Error::InvalidArgumentError(
3517            "object name must not be empty".into(),
3518        ));
3519    }
3520    let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
3521    for part in &name.0 {
3522        let ident = match part {
3523            ObjectNamePart::Identifier(ident) => ident,
3524            _ => {
3525                return Err(Error::InvalidArgumentError(
3526                    "object names using functions are not supported".into(),
3527                ));
3528            }
3529        };
3530        parts.push(ident.value.clone());
3531    }
3532    let display = parts.join(".");
3533    let canonical = display.to_ascii_lowercase();
3534    Ok((display, canonical))
3535}
3536
3537/// Parse an object name into optional schema and table name components.
3538///
3539/// Returns (schema_name, table_name) where schema_name is None if not qualified.
3540///
3541/// Examples:
3542/// - "users" -> (None, "users")
3543/// - "test.users" -> (Some("test"), "users")
3544/// - "catalog.test.users" -> Error (too many parts)
3545fn parse_schema_qualified_name(name: &ObjectName) -> SqlResult<(Option<String>, String)> {
3546    if name.0.is_empty() {
3547        return Err(Error::InvalidArgumentError(
3548            "object name must not be empty".into(),
3549        ));
3550    }
3551
3552    let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
3553    for part in &name.0 {
3554        let ident = match part {
3555            ObjectNamePart::Identifier(ident) => ident,
3556            _ => {
3557                return Err(Error::InvalidArgumentError(
3558                    "object names using functions are not supported".into(),
3559                ));
3560            }
3561        };
3562        parts.push(ident.value.clone());
3563    }
3564
3565    match parts.len() {
3566        1 => Ok((None, parts[0].clone())),
3567        2 => Ok((Some(parts[0].clone()), parts[1].clone())),
3568        _ => Err(Error::InvalidArgumentError(format!(
3569            "table name has too many parts: {}",
3570            name
3571        ))),
3572    }
3573}
3574
3575/// Extract column name from an index column specification (OrderBy expression).
3576///
3577/// This handles the common pattern of validating and extracting column names from
3578/// SQL index column definitions (PRIMARY KEY, UNIQUE, CREATE INDEX).
3579///
3580/// # Parameters
3581/// - `index_col`: The index column from sqlparser AST
3582/// - `context`: Description of where this column appears (e.g., "PRIMARY KEY", "UNIQUE constraint")
3583/// - `allow_sort_options`: If false, errors if any sort options are present; if true, validates them
3584/// - `allow_compound`: If true, allows compound identifiers and takes the last part; if false, only allows simple identifiers
3585///
3586/// # Returns
3587/// Column name as a String
3588fn extract_index_column_name(
3589    index_col: &sqlparser::ast::IndexColumn,
3590    context: &str,
3591    allow_sort_options: bool,
3592    allow_compound: bool,
3593) -> SqlResult<String> {
3594    use sqlparser::ast::Expr as SqlExpr;
3595
3596    // Check operator class
3597    if index_col.operator_class.is_some() {
3598        return Err(Error::InvalidArgumentError(format!(
3599            "{} operator classes are not supported",
3600            context
3601        )));
3602    }
3603
3604    let order_expr = &index_col.column;
3605
3606    // Validate sort options
3607    if allow_sort_options {
3608        // For CREATE INDEX: validate specific values are supported
3609        let ascending = order_expr.options.asc.unwrap_or(true);
3610        let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
3611
3612        if !ascending {
3613            return Err(Error::InvalidArgumentError(format!(
3614                "{} DESC ordering is not supported",
3615                context
3616            )));
3617        }
3618        if nulls_first {
3619            return Err(Error::InvalidArgumentError(format!(
3620                "{} NULLS FIRST ordering is not supported",
3621                context
3622            )));
3623        }
3624    } else {
3625        // For constraints: no sort options allowed
3626        if order_expr.options.asc.is_some()
3627            || order_expr.options.nulls_first.is_some()
3628            || order_expr.with_fill.is_some()
3629        {
3630            return Err(Error::InvalidArgumentError(format!(
3631                "{} columns must be simple identifiers",
3632                context
3633            )));
3634        }
3635    }
3636
3637    // Extract column name from expression
3638    let column_name = match &order_expr.expr {
3639        SqlExpr::Identifier(ident) => ident.value.clone(),
3640        SqlExpr::CompoundIdentifier(parts) => {
3641            if allow_compound {
3642                // For CREATE INDEX: allow qualified names, take last part
3643                parts
3644                    .last()
3645                    .map(|ident| ident.value.clone())
3646                    .ok_or_else(|| {
3647                        Error::InvalidArgumentError(format!(
3648                            "invalid column reference in {}",
3649                            context
3650                        ))
3651                    })?
3652            } else if parts.len() == 1 {
3653                // For constraints: only allow single-part compound identifiers
3654                parts[0].value.clone()
3655            } else {
3656                return Err(Error::InvalidArgumentError(format!(
3657                    "{} columns must be column identifiers",
3658                    context
3659                )));
3660            }
3661        }
3662        other => {
3663            return Err(Error::InvalidArgumentError(format!(
3664                "{} only supports column references, found {:?}",
3665                context, other
3666            )));
3667        }
3668    };
3669
3670    Ok(column_name)
3671}
3672
3673fn validate_create_table_common(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3674    if stmt.clone.is_some() || stmt.like.is_some() {
3675        return Err(Error::InvalidArgumentError(
3676            "CREATE TABLE LIKE/CLONE is not supported".into(),
3677        ));
3678    }
3679    if stmt.or_replace && stmt.if_not_exists {
3680        return Err(Error::InvalidArgumentError(
3681            "CREATE TABLE cannot combine OR REPLACE with IF NOT EXISTS".into(),
3682        ));
3683    }
3684    use sqlparser::ast::TableConstraint;
3685
3686    let mut seen_primary_key = false;
3687    for constraint in &stmt.constraints {
3688        match constraint {
3689            TableConstraint::PrimaryKey { .. } => {
3690                if seen_primary_key {
3691                    return Err(Error::InvalidArgumentError(
3692                        "multiple PRIMARY KEY constraints are not supported".into(),
3693                    ));
3694                }
3695                seen_primary_key = true;
3696            }
3697            TableConstraint::Unique { .. } => {
3698                // Detailed validation is performed later during plan construction.
3699            }
3700            TableConstraint::ForeignKey { .. } => {
3701                // Detailed validation is performed later during plan construction.
3702            }
3703            other => {
3704                return Err(Error::InvalidArgumentError(format!(
3705                    "table-level constraint {:?} is not supported",
3706                    other
3707                )));
3708            }
3709        }
3710    }
3711    Ok(())
3712}
3713
3714fn validate_check_constraint(
3715    check_expr: &sqlparser::ast::Expr,
3716    table_name: &str,
3717    column_names: &[&str],
3718) -> SqlResult<()> {
3719    use sqlparser::ast::Expr as SqlExpr;
3720
3721    let column_names_lower: HashSet<String> = column_names
3722        .iter()
3723        .map(|name| name.to_ascii_lowercase())
3724        .collect();
3725
3726    let mut stack: Vec<&SqlExpr> = vec![check_expr];
3727
3728    while let Some(expr) = stack.pop() {
3729        match expr {
3730            SqlExpr::Subquery(_) => {
3731                return Err(Error::InvalidArgumentError(
3732                    "Subqueries are not allowed in CHECK constraints".into(),
3733                ));
3734            }
3735            SqlExpr::Function(func) => {
3736                let func_name = func.name.to_string().to_uppercase();
3737                if matches!(func_name.as_str(), "SUM" | "AVG" | "COUNT" | "MIN" | "MAX") {
3738                    return Err(Error::InvalidArgumentError(
3739                        "Aggregate functions are not allowed in CHECK constraints".into(),
3740                    ));
3741                }
3742
3743                if let sqlparser::ast::FunctionArguments::List(list) = &func.args {
3744                    for arg in &list.args {
3745                        if let sqlparser::ast::FunctionArg::Unnamed(
3746                            sqlparser::ast::FunctionArgExpr::Expr(expr),
3747                        ) = arg
3748                        {
3749                            stack.push(expr);
3750                        }
3751                    }
3752                }
3753            }
3754            SqlExpr::Identifier(ident) => {
3755                if !column_names_lower.contains(&ident.value.to_ascii_lowercase()) {
3756                    return Err(Error::InvalidArgumentError(format!(
3757                        "Column '{}' referenced in CHECK constraint does not exist",
3758                        ident.value
3759                    )));
3760                }
3761            }
3762            SqlExpr::CompoundIdentifier(idents) => {
3763                if idents.len() == 2 {
3764                    let first = idents[0].value.as_str();
3765                    let second = &idents[1].value;
3766
3767                    if column_names_lower.contains(&first.to_ascii_lowercase()) {
3768                        continue;
3769                    }
3770
3771                    if !first.eq_ignore_ascii_case(table_name) {
3772                        return Err(Error::InvalidArgumentError(format!(
3773                            "CHECK constraint references column from different table '{}'",
3774                            first
3775                        )));
3776                    }
3777
3778                    if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3779                        return Err(Error::InvalidArgumentError(format!(
3780                            "Column '{}' referenced in CHECK constraint does not exist",
3781                            second
3782                        )));
3783                    }
3784                } else if idents.len() == 3 {
3785                    let first = &idents[0].value;
3786                    let second = &idents[1].value;
3787                    let third = &idents[2].value;
3788
3789                    if first.eq_ignore_ascii_case(table_name) {
3790                        if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3791                            return Err(Error::InvalidArgumentError(format!(
3792                                "Column '{}' referenced in CHECK constraint does not exist",
3793                                second
3794                            )));
3795                        }
3796                    } else if second.eq_ignore_ascii_case(table_name) {
3797                        if !column_names_lower.contains(&third.to_ascii_lowercase()) {
3798                            return Err(Error::InvalidArgumentError(format!(
3799                                "Column '{}' referenced in CHECK constraint does not exist",
3800                                third
3801                            )));
3802                        }
3803                    } else {
3804                        return Err(Error::InvalidArgumentError(format!(
3805                            "CHECK constraint references column from different table '{}'",
3806                            second
3807                        )));
3808                    }
3809                }
3810            }
3811            SqlExpr::BinaryOp { left, right, .. } => {
3812                stack.push(left);
3813                stack.push(right);
3814            }
3815            SqlExpr::UnaryOp { expr, .. } | SqlExpr::Nested(expr) => {
3816                stack.push(expr);
3817            }
3818            SqlExpr::Value(_) | SqlExpr::TypedString { .. } => {}
3819            _ => {}
3820        }
3821    }
3822
3823    Ok(())
3824}
3825
3826fn validate_create_table_definition(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3827    for column in &stmt.columns {
3828        for ColumnOptionDef { option, .. } in &column.options {
3829            match option {
3830                ColumnOption::Null
3831                | ColumnOption::NotNull
3832                | ColumnOption::Unique { .. }
3833                | ColumnOption::Check(_)
3834                | ColumnOption::ForeignKey { .. } => {}
3835                ColumnOption::Default(_) => {
3836                    return Err(Error::InvalidArgumentError(format!(
3837                        "DEFAULT values are not supported for column '{}'",
3838                        column.name
3839                    )));
3840                }
3841                other => {
3842                    return Err(Error::InvalidArgumentError(format!(
3843                        "unsupported column option {:?} on '{}'",
3844                        other, column.name
3845                    )));
3846                }
3847            }
3848        }
3849    }
3850    Ok(())
3851}
3852
3853fn validate_create_table_as(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3854    if !stmt.columns.is_empty() {
3855        return Err(Error::InvalidArgumentError(
3856            "CREATE TABLE AS SELECT does not support column definitions yet".into(),
3857        ));
3858    }
3859    Ok(())
3860}
3861
3862fn validate_simple_query(query: &Query) -> SqlResult<()> {
3863    if query.with.is_some() {
3864        return Err(Error::InvalidArgumentError(
3865            "WITH clauses are not supported".into(),
3866        ));
3867    }
3868    if let Some(limit_clause) = &query.limit_clause {
3869        match limit_clause {
3870            LimitClause::LimitOffset {
3871                offset: Some(_), ..
3872            }
3873            | LimitClause::OffsetCommaLimit { .. } => {
3874                return Err(Error::InvalidArgumentError(
3875                    "OFFSET clauses are not supported".into(),
3876                ));
3877            }
3878            LimitClause::LimitOffset { limit_by, .. } if !limit_by.is_empty() => {
3879                return Err(Error::InvalidArgumentError(
3880                    "LIMIT BY clauses are not supported".into(),
3881                ));
3882            }
3883            _ => {}
3884        }
3885    }
3886    if query.fetch.is_some() {
3887        return Err(Error::InvalidArgumentError(
3888            "FETCH clauses are not supported".into(),
3889        ));
3890    }
3891    Ok(())
3892}
3893
3894fn resolve_column_name(expr: &SqlExpr) -> SqlResult<String> {
3895    match expr {
3896        SqlExpr::Identifier(ident) => Ok(ident.value.clone()),
3897        SqlExpr::CompoundIdentifier(parts) => {
3898            if let Some(last) = parts.last() {
3899                Ok(last.value.clone())
3900            } else {
3901                Err(Error::InvalidArgumentError(
3902                    "empty column identifier".into(),
3903                ))
3904            }
3905        }
3906        _ => Err(Error::InvalidArgumentError(
3907            "aggregate arguments must be plain column identifiers".into(),
3908        )),
3909    }
3910}
3911
3912fn validate_projection_alias_qualifiers(
3913    projection_items: &[SelectItem],
3914    alias: &str,
3915) -> SqlResult<()> {
3916    let alias_lower = alias.to_ascii_lowercase();
3917    for item in projection_items {
3918        match item {
3919            SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
3920                if let SqlExpr::CompoundIdentifier(parts) = expr
3921                    && parts.len() >= 2
3922                    && let Some(first) = parts.first()
3923                    && !first.value.eq_ignore_ascii_case(&alias_lower)
3924                {
3925                    return Err(Error::InvalidArgumentError(format!(
3926                        "Binder Error: table '{}' not found",
3927                        first.value
3928                    )));
3929                }
3930            }
3931            _ => {}
3932        }
3933    }
3934    Ok(())
3935}
3936
3937/// Try to parse a function as an aggregate call for use in scalar expressions
3938/// Check if a scalar expression contains any aggregate functions
3939#[allow(dead_code)] // Utility function for future use
3940fn expr_contains_aggregate(expr: &llkv_expr::expr::ScalarExpr<String>) -> bool {
3941    match expr {
3942        llkv_expr::expr::ScalarExpr::Aggregate(_) => true,
3943        llkv_expr::expr::ScalarExpr::Binary { left, right, .. } => {
3944            expr_contains_aggregate(left) || expr_contains_aggregate(right)
3945        }
3946        llkv_expr::expr::ScalarExpr::GetField { base, .. } => expr_contains_aggregate(base),
3947        llkv_expr::expr::ScalarExpr::Column(_) | llkv_expr::expr::ScalarExpr::Literal(_) => false,
3948    }
3949}
3950
3951fn try_parse_aggregate_function(
3952    func: &sqlparser::ast::Function,
3953) -> SqlResult<Option<llkv_expr::expr::AggregateCall<String>>> {
3954    use sqlparser::ast::{FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart};
3955
3956    if func.uses_odbc_syntax {
3957        return Ok(None);
3958    }
3959    if !matches!(func.parameters, FunctionArguments::None) {
3960        return Ok(None);
3961    }
3962    if func.filter.is_some()
3963        || func.null_treatment.is_some()
3964        || func.over.is_some()
3965        || !func.within_group.is_empty()
3966    {
3967        return Ok(None);
3968    }
3969
3970    let func_name = if func.name.0.len() == 1 {
3971        match &func.name.0[0] {
3972            ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
3973            _ => return Ok(None),
3974        }
3975    } else {
3976        return Ok(None);
3977    };
3978
3979    let args_slice: &[FunctionArg] = match &func.args {
3980        FunctionArguments::List(list) => {
3981            if list.duplicate_treatment.is_some() || !list.clauses.is_empty() {
3982                return Ok(None);
3983            }
3984            &list.args
3985        }
3986        FunctionArguments::None => &[],
3987        FunctionArguments::Subquery(_) => return Ok(None),
3988    };
3989
3990    let agg_call = match func_name.as_str() {
3991        "count" => {
3992            if args_slice.len() != 1 {
3993                return Err(Error::InvalidArgumentError(
3994                    "COUNT accepts exactly one argument".into(),
3995                ));
3996            }
3997            match &args_slice[0] {
3998                FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
3999                    llkv_expr::expr::AggregateCall::CountStar
4000                }
4001                FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
4002                    let column = resolve_column_name(arg_expr)?;
4003                    llkv_expr::expr::AggregateCall::Count(column)
4004                }
4005                _ => {
4006                    return Err(Error::InvalidArgumentError(
4007                        "unsupported COUNT argument".into(),
4008                    ));
4009                }
4010            }
4011        }
4012        "sum" => {
4013            if args_slice.len() != 1 {
4014                return Err(Error::InvalidArgumentError(
4015                    "SUM accepts exactly one argument".into(),
4016                ));
4017            }
4018            let arg_expr = match &args_slice[0] {
4019                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4020                _ => {
4021                    return Err(Error::InvalidArgumentError(
4022                        "SUM requires a column argument".into(),
4023                    ));
4024                }
4025            };
4026
4027            // Check for COUNT(CASE ...) pattern
4028            if let Some(column) = parse_count_nulls_case(arg_expr)? {
4029                llkv_expr::expr::AggregateCall::CountNulls(column)
4030            } else {
4031                let column = resolve_column_name(arg_expr)?;
4032                llkv_expr::expr::AggregateCall::Sum(column)
4033            }
4034        }
4035        "min" => {
4036            if args_slice.len() != 1 {
4037                return Err(Error::InvalidArgumentError(
4038                    "MIN accepts exactly one argument".into(),
4039                ));
4040            }
4041            let arg_expr = match &args_slice[0] {
4042                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4043                _ => {
4044                    return Err(Error::InvalidArgumentError(
4045                        "MIN requires a column argument".into(),
4046                    ));
4047                }
4048            };
4049            let column = resolve_column_name(arg_expr)?;
4050            llkv_expr::expr::AggregateCall::Min(column)
4051        }
4052        "max" => {
4053            if args_slice.len() != 1 {
4054                return Err(Error::InvalidArgumentError(
4055                    "MAX accepts exactly one argument".into(),
4056                ));
4057            }
4058            let arg_expr = match &args_slice[0] {
4059                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4060                _ => {
4061                    return Err(Error::InvalidArgumentError(
4062                        "MAX requires a column argument".into(),
4063                    ));
4064                }
4065            };
4066            let column = resolve_column_name(arg_expr)?;
4067            llkv_expr::expr::AggregateCall::Max(column)
4068        }
4069        _ => return Ok(None),
4070    };
4071
4072    Ok(Some(agg_call))
4073}
4074
4075fn parse_count_nulls_case(expr: &SqlExpr) -> SqlResult<Option<String>> {
4076    let SqlExpr::Case {
4077        operand,
4078        conditions,
4079        else_result,
4080        ..
4081    } = expr
4082    else {
4083        return Ok(None);
4084    };
4085
4086    if operand.is_some() || conditions.len() != 1 {
4087        return Ok(None);
4088    }
4089
4090    let case_when = &conditions[0];
4091    if !is_integer_literal(&case_when.result, 1) {
4092        return Ok(None);
4093    }
4094
4095    let else_expr = match else_result {
4096        Some(expr) => expr.as_ref(),
4097        None => return Ok(None),
4098    };
4099    if !is_integer_literal(else_expr, 0) {
4100        return Ok(None);
4101    }
4102
4103    let inner = match &case_when.condition {
4104        SqlExpr::IsNull(inner) => inner.as_ref(),
4105        _ => return Ok(None),
4106    };
4107
4108    resolve_column_name(inner).map(Some)
4109}
4110
4111fn is_integer_literal(expr: &SqlExpr, expected: i64) -> bool {
4112    match expr {
4113        SqlExpr::Value(ValueWithSpan {
4114            value: Value::Number(text, _),
4115            ..
4116        }) => text.parse::<i64>() == Ok(expected),
4117        _ => false,
4118    }
4119}
4120
4121fn translate_condition_with_context(
4122    resolver: &IdentifierResolver<'_>,
4123    context: IdentifierContext,
4124    expr: &SqlExpr,
4125) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
4126    match expr {
4127        SqlExpr::IsNull(inner) => {
4128            let scalar = translate_scalar_with_context(resolver, context, inner)?;
4129            match scalar {
4130                llkv_expr::expr::ScalarExpr::Column(column) => {
4131                    Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4132                        field_id: column,
4133                        op: llkv_expr::expr::Operator::IsNull,
4134                    }))
4135                }
4136                _ => Err(Error::InvalidArgumentError(
4137                    "IS NULL predicates currently support column references only".into(),
4138                )),
4139            }
4140        }
4141        SqlExpr::IsNotNull(inner) => {
4142            let scalar = translate_scalar_with_context(resolver, context, inner)?;
4143            match scalar {
4144                llkv_expr::expr::ScalarExpr::Column(column) => {
4145                    Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4146                        field_id: column,
4147                        op: llkv_expr::expr::Operator::IsNotNull,
4148                    }))
4149                }
4150                _ => Err(Error::InvalidArgumentError(
4151                    "IS NOT NULL predicates currently support column references only".into(),
4152                )),
4153            }
4154        }
4155        SqlExpr::BinaryOp { left, op, right } => match op {
4156            BinaryOperator::And => Ok(llkv_expr::expr::Expr::And(vec![
4157                translate_condition_with_context(resolver, context, left)?,
4158                translate_condition_with_context(resolver, context, right)?,
4159            ])),
4160            BinaryOperator::Or => Ok(llkv_expr::expr::Expr::Or(vec![
4161                translate_condition_with_context(resolver, context, left)?,
4162                translate_condition_with_context(resolver, context, right)?,
4163            ])),
4164            BinaryOperator::Eq
4165            | BinaryOperator::NotEq
4166            | BinaryOperator::Lt
4167            | BinaryOperator::LtEq
4168            | BinaryOperator::Gt
4169            | BinaryOperator::GtEq => {
4170                translate_comparison_with_context(resolver, context, left, op.clone(), right)
4171            }
4172            other => Err(Error::InvalidArgumentError(format!(
4173                "unsupported binary operator in WHERE clause: {other:?}"
4174            ))),
4175        },
4176        SqlExpr::UnaryOp {
4177            op: UnaryOperator::Not,
4178            expr,
4179        } => Ok(llkv_expr::expr::Expr::not(
4180            translate_condition_with_context(resolver, context, expr)?,
4181        )),
4182        SqlExpr::Nested(inner) => translate_condition_with_context(resolver, context, inner),
4183        other => Err(Error::InvalidArgumentError(format!(
4184            "unsupported WHERE clause: {other:?}"
4185        ))),
4186    }
4187}
4188
4189fn translate_comparison_with_context(
4190    resolver: &IdentifierResolver<'_>,
4191    context: IdentifierContext,
4192    left: &SqlExpr,
4193    op: BinaryOperator,
4194    right: &SqlExpr,
4195) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
4196    let left_scalar = translate_scalar_with_context(resolver, context, left)?;
4197    let right_scalar = translate_scalar_with_context(resolver, context, right)?;
4198    let compare_op = match op {
4199        BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
4200        BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
4201        BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
4202        BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
4203        BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
4204        BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
4205        other => {
4206            return Err(Error::InvalidArgumentError(format!(
4207                "unsupported comparison operator: {other:?}"
4208            )));
4209        }
4210    };
4211
4212    if let (
4213        llkv_expr::expr::ScalarExpr::Column(column),
4214        llkv_expr::expr::ScalarExpr::Literal(literal),
4215    ) = (&left_scalar, &right_scalar)
4216        && let Some(op) = compare_op_to_filter_operator(compare_op, literal)
4217    {
4218        return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4219            field_id: column.clone(),
4220            op,
4221        }));
4222    }
4223
4224    if let (
4225        llkv_expr::expr::ScalarExpr::Literal(literal),
4226        llkv_expr::expr::ScalarExpr::Column(column),
4227    ) = (&left_scalar, &right_scalar)
4228        && let Some(flipped) = flip_compare_op(compare_op)
4229        && let Some(op) = compare_op_to_filter_operator(flipped, literal)
4230    {
4231        return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4232            field_id: column.clone(),
4233            op,
4234        }));
4235    }
4236
4237    Ok(llkv_expr::expr::Expr::Compare {
4238        left: left_scalar,
4239        op: compare_op,
4240        right: right_scalar,
4241    })
4242}
4243
4244fn compare_op_to_filter_operator(
4245    op: llkv_expr::expr::CompareOp,
4246    literal: &Literal,
4247) -> Option<llkv_expr::expr::Operator<'static>> {
4248    let lit = literal.clone();
4249    match op {
4250        llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::Operator::Equals(lit)),
4251        llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::Operator::LessThan(lit)),
4252        llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::Operator::LessThanOrEquals(lit)),
4253        llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::Operator::GreaterThan(lit)),
4254        llkv_expr::expr::CompareOp::GtEq => {
4255            Some(llkv_expr::expr::Operator::GreaterThanOrEquals(lit))
4256        }
4257        llkv_expr::expr::CompareOp::NotEq => None,
4258    }
4259}
4260
4261fn flip_compare_op(op: llkv_expr::expr::CompareOp) -> Option<llkv_expr::expr::CompareOp> {
4262    match op {
4263        llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::CompareOp::Eq),
4264        llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::CompareOp::Gt),
4265        llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::CompareOp::GtEq),
4266        llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::CompareOp::Lt),
4267        llkv_expr::expr::CompareOp::GtEq => Some(llkv_expr::expr::CompareOp::LtEq),
4268        llkv_expr::expr::CompareOp::NotEq => None,
4269    }
4270}
4271/// Translate scalar expression with knowledge of the FROM table context.
4272/// This allows us to properly distinguish schema.table.column from column.field.field.
4273fn translate_scalar_with_context(
4274    resolver: &IdentifierResolver<'_>,
4275    context: IdentifierContext,
4276    expr: &SqlExpr,
4277) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4278    match expr {
4279        SqlExpr::Identifier(ident) => {
4280            let parts = vec![ident.value.clone()];
4281            let resolution = resolver.resolve(&parts, context)?;
4282            Ok(resolution.into_scalar_expr())
4283        }
4284        SqlExpr::CompoundIdentifier(idents) => {
4285            if idents.is_empty() {
4286                return Err(Error::InvalidArgumentError(
4287                    "invalid compound identifier".into(),
4288                ));
4289            }
4290
4291            let parts: Vec<String> = idents.iter().map(|ident| ident.value.clone()).collect();
4292            let resolution = resolver.resolve(&parts, context)?;
4293            Ok(resolution.into_scalar_expr())
4294        }
4295        _ => translate_scalar(expr),
4296    }
4297}
4298
4299// TODO: Rename?  Already many `translate_scaler` functions... be more sepcific?
4300fn translate_scalar(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4301    match expr {
4302        SqlExpr::Identifier(ident) => Ok(llkv_expr::expr::ScalarExpr::column(ident.value.clone())),
4303        SqlExpr::CompoundIdentifier(idents) => {
4304            if idents.is_empty() {
4305                return Err(Error::InvalidArgumentError(
4306                    "invalid compound identifier".into(),
4307                ));
4308            }
4309
4310            // Without table context, treat first part as column, rest as fields
4311            let column_name = idents[0].value.clone();
4312            let mut result = llkv_expr::expr::ScalarExpr::column(column_name);
4313
4314            for part in &idents[1..] {
4315                let field_name = part.value.clone();
4316                result = llkv_expr::expr::ScalarExpr::get_field(result, field_name);
4317            }
4318
4319            Ok(result)
4320        }
4321        SqlExpr::Value(value) => literal_from_value(value),
4322        SqlExpr::BinaryOp { left, op, right } => {
4323            let left_expr = translate_scalar(left)?;
4324            let right_expr = translate_scalar(right)?;
4325            let op = match op {
4326                BinaryOperator::Plus => llkv_expr::expr::BinaryOp::Add,
4327                BinaryOperator::Minus => llkv_expr::expr::BinaryOp::Subtract,
4328                BinaryOperator::Multiply => llkv_expr::expr::BinaryOp::Multiply,
4329                BinaryOperator::Divide => llkv_expr::expr::BinaryOp::Divide,
4330                BinaryOperator::Modulo => llkv_expr::expr::BinaryOp::Modulo,
4331                other => {
4332                    return Err(Error::InvalidArgumentError(format!(
4333                        "unsupported scalar binary operator: {other:?}"
4334                    )));
4335                }
4336            };
4337            Ok(llkv_expr::expr::ScalarExpr::binary(
4338                left_expr, op, right_expr,
4339            ))
4340        }
4341        SqlExpr::UnaryOp {
4342            op: UnaryOperator::Minus,
4343            expr,
4344        } => match translate_scalar(expr)? {
4345            llkv_expr::expr::ScalarExpr::Literal(lit) => match lit {
4346                Literal::Integer(v) => {
4347                    Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(-v)))
4348                }
4349                Literal::Float(v) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(-v))),
4350                Literal::Boolean(_) => Err(Error::InvalidArgumentError(
4351                    "cannot negate boolean literal".into(),
4352                )),
4353                Literal::String(_) => Err(Error::InvalidArgumentError(
4354                    "cannot negate string literal".into(),
4355                )),
4356                Literal::Struct(_) => Err(Error::InvalidArgumentError(
4357                    "cannot negate struct literal".into(),
4358                )),
4359                Literal::Null => Err(Error::InvalidArgumentError(
4360                    "cannot negate null literal".into(),
4361                )),
4362            },
4363            _ => Err(Error::InvalidArgumentError(
4364                "cannot negate non-literal expression".into(),
4365            )),
4366        },
4367        SqlExpr::UnaryOp {
4368            op: UnaryOperator::Plus,
4369            expr,
4370        } => translate_scalar(expr),
4371        SqlExpr::Nested(inner) => translate_scalar(inner),
4372        SqlExpr::Function(func) => {
4373            // Try to parse as an aggregate function
4374            if let Some(agg_call) = try_parse_aggregate_function(func)? {
4375                Ok(llkv_expr::expr::ScalarExpr::aggregate(agg_call))
4376            } else {
4377                Err(Error::InvalidArgumentError(format!(
4378                    "unsupported function in scalar expression: {:?}",
4379                    func.name
4380                )))
4381            }
4382        }
4383        SqlExpr::Dictionary(fields) => {
4384            // Dictionary literal like {'a': 1, 'b': 2} represents a STRUCT literal
4385            let mut struct_fields = Vec::new();
4386            for entry in fields {
4387                let key = entry.key.value.clone(); // Use .value to get the actual identifier value
4388                // Parse the value to a literal
4389                let value_expr = translate_scalar(&entry.value)?;
4390                // Extract the literal from the expression
4391                match value_expr {
4392                    llkv_expr::expr::ScalarExpr::Literal(lit) => {
4393                        struct_fields.push((key, Box::new(lit)));
4394                    }
4395                    _ => {
4396                        return Err(Error::InvalidArgumentError(
4397                            "Dictionary values must be literals".to_string(),
4398                        ));
4399                    }
4400                }
4401            }
4402            Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Struct(
4403                struct_fields,
4404            )))
4405        }
4406        other => Err(Error::InvalidArgumentError(format!(
4407            "unsupported scalar expression: {other:?}"
4408        ))),
4409    }
4410}
4411
4412fn literal_from_value(value: &ValueWithSpan) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4413    match &value.value {
4414        Value::Number(text, _) => {
4415            if text.contains(['.', 'e', 'E']) {
4416                let parsed = text.parse::<f64>().map_err(|err| {
4417                    Error::InvalidArgumentError(format!("invalid float literal: {err}"))
4418                })?;
4419                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(parsed)))
4420            } else {
4421                let parsed = text.parse::<i128>().map_err(|err| {
4422                    Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
4423                })?;
4424                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(
4425                    parsed,
4426                )))
4427            }
4428        }
4429        Value::Boolean(value) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Boolean(
4430            *value,
4431        ))),
4432        Value::Null => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Null)),
4433        other => {
4434            if let Some(text) = other.clone().into_string() {
4435                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::String(text)))
4436            } else {
4437                Err(Error::InvalidArgumentError(format!(
4438                    "unsupported literal: {other:?}"
4439                )))
4440            }
4441        }
4442    }
4443}
4444
4445fn resolve_assignment_column_name(target: &AssignmentTarget) -> SqlResult<String> {
4446    match target {
4447        AssignmentTarget::ColumnName(name) => {
4448            if name.0.len() != 1 {
4449                return Err(Error::InvalidArgumentError(
4450                    "qualified column names in UPDATE assignments are not supported yet".into(),
4451                ));
4452            }
4453            match &name.0[0] {
4454                ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
4455                other => Err(Error::InvalidArgumentError(format!(
4456                    "unsupported column reference in UPDATE assignment: {other:?}"
4457                ))),
4458            }
4459        }
4460        AssignmentTarget::Tuple(_) => Err(Error::InvalidArgumentError(
4461            "tuple assignments are not supported yet".into(),
4462        )),
4463    }
4464}
4465
4466fn arrow_type_from_sql(data_type: &SqlDataType) -> SqlResult<arrow::datatypes::DataType> {
4467    use arrow::datatypes::DataType;
4468    match data_type {
4469        SqlDataType::Int(_)
4470        | SqlDataType::Integer(_)
4471        | SqlDataType::BigInt(_)
4472        | SqlDataType::SmallInt(_)
4473        | SqlDataType::TinyInt(_) => Ok(DataType::Int64),
4474        SqlDataType::Float(_)
4475        | SqlDataType::Real
4476        | SqlDataType::Double(_)
4477        | SqlDataType::DoublePrecision => Ok(DataType::Float64),
4478        SqlDataType::Text
4479        | SqlDataType::String(_)
4480        | SqlDataType::Varchar(_)
4481        | SqlDataType::Char(_)
4482        | SqlDataType::Uuid => Ok(DataType::Utf8),
4483        SqlDataType::Date => Ok(DataType::Date32),
4484        SqlDataType::Decimal(_) | SqlDataType::Numeric(_) => Ok(DataType::Float64),
4485        SqlDataType::Boolean => Ok(DataType::Boolean),
4486        SqlDataType::Custom(name, args) => {
4487            if name.0.len() == 1
4488                && let ObjectNamePart::Identifier(ident) = &name.0[0]
4489                && ident.value.eq_ignore_ascii_case("row")
4490            {
4491                return row_type_to_arrow(data_type, args);
4492            }
4493            Err(Error::InvalidArgumentError(format!(
4494                "unsupported SQL data type: {data_type:?}"
4495            )))
4496        }
4497        other => Err(Error::InvalidArgumentError(format!(
4498            "unsupported SQL data type: {other:?}"
4499        ))),
4500    }
4501}
4502
4503fn row_type_to_arrow(
4504    data_type: &SqlDataType,
4505    tokens: &[String],
4506) -> SqlResult<arrow::datatypes::DataType> {
4507    use arrow::datatypes::{DataType, Field, FieldRef, Fields};
4508
4509    let row_str = data_type.to_string();
4510    if tokens.is_empty() {
4511        return Err(Error::InvalidArgumentError(
4512            "ROW type must define at least one field".into(),
4513        ));
4514    }
4515
4516    let dialect = GenericDialect {};
4517    let field_definitions = resolve_row_field_types(tokens, &dialect).map_err(|err| {
4518        Error::InvalidArgumentError(format!("unable to parse ROW type '{row_str}': {err}"))
4519    })?;
4520
4521    let mut fields: Vec<FieldRef> = Vec::with_capacity(field_definitions.len());
4522    for (field_name, field_type) in field_definitions {
4523        let arrow_field_type = arrow_type_from_sql(&field_type)?;
4524        fields.push(Arc::new(Field::new(field_name, arrow_field_type, true)));
4525    }
4526
4527    let struct_fields: Fields = fields.into();
4528    Ok(DataType::Struct(struct_fields))
4529}
4530
4531fn resolve_row_field_types(
4532    tokens: &[String],
4533    dialect: &GenericDialect,
4534) -> SqlResult<Vec<(String, SqlDataType)>> {
4535    if tokens.is_empty() {
4536        return Err(Error::InvalidArgumentError(
4537            "ROW type must define at least one field".into(),
4538        ));
4539    }
4540
4541    let mut start = 0;
4542    let mut end = tokens.len();
4543    if tokens[start] == "(" {
4544        if end == 0 || tokens[end - 1] != ")" {
4545            return Err(Error::InvalidArgumentError(
4546                "ROW type is missing closing ')'".into(),
4547            ));
4548        }
4549        start += 1;
4550        end -= 1;
4551    } else if tokens[end - 1] == ")" {
4552        return Err(Error::InvalidArgumentError(
4553            "ROW type contains unmatched ')'".into(),
4554        ));
4555    }
4556
4557    let slice = &tokens[start..end];
4558    if slice.is_empty() {
4559        return Err(Error::InvalidArgumentError(
4560            "ROW type did not provide any field definitions".into(),
4561        ));
4562    }
4563
4564    let mut fields = Vec::new();
4565    let mut index = 0;
4566
4567    while index < slice.len() {
4568        if slice[index] == "," {
4569            index += 1;
4570            continue;
4571        }
4572
4573        let field_name = normalize_row_field_name(&slice[index])?;
4574        index += 1;
4575
4576        if index >= slice.len() {
4577            return Err(Error::InvalidArgumentError(format!(
4578                "ROW field '{field_name}' is missing a type specification"
4579            )));
4580        }
4581
4582        let mut last_success: Option<(usize, SqlDataType)> = None;
4583        let mut type_end = index;
4584
4585        while type_end <= slice.len() {
4586            let candidate = slice[index..type_end].join(" ");
4587            if candidate.trim().is_empty() {
4588                type_end += 1;
4589                continue;
4590            }
4591
4592            if let Ok(parsed_type) = parse_sql_data_type(&candidate, dialect) {
4593                last_success = Some((type_end, parsed_type));
4594            }
4595
4596            if type_end == slice.len() {
4597                break;
4598            }
4599
4600            if slice[type_end] == "," && last_success.is_some() {
4601                break;
4602            }
4603
4604            type_end += 1;
4605        }
4606
4607        let Some((next_index, data_type)) = last_success else {
4608            return Err(Error::InvalidArgumentError(format!(
4609                "failed to parse ROW field type for '{field_name}'"
4610            )));
4611        };
4612
4613        fields.push((field_name, data_type));
4614        index = next_index;
4615
4616        if index < slice.len() && slice[index] == "," {
4617            index += 1;
4618        }
4619    }
4620
4621    if fields.is_empty() {
4622        return Err(Error::InvalidArgumentError(
4623            "ROW type did not provide any field definitions".into(),
4624        ));
4625    }
4626
4627    Ok(fields)
4628}
4629
4630fn normalize_row_field_name(raw: &str) -> SqlResult<String> {
4631    let trimmed = raw.trim();
4632    if trimmed.is_empty() {
4633        return Err(Error::InvalidArgumentError(
4634            "ROW field name must not be empty".into(),
4635        ));
4636    }
4637
4638    if let Some(stripped) = trimmed.strip_prefix('"') {
4639        let without_end = stripped.strip_suffix('"').ok_or_else(|| {
4640            Error::InvalidArgumentError(format!("unterminated quoted ROW field name: {trimmed}"))
4641        })?;
4642        let name = without_end.replace("\"\"", "\"");
4643        return Ok(name);
4644    }
4645
4646    Ok(trimmed.to_string())
4647}
4648
4649fn parse_sql_data_type(type_str: &str, dialect: &GenericDialect) -> SqlResult<SqlDataType> {
4650    let trimmed = type_str.trim();
4651    let sql = format!("CREATE TABLE __row(__field {trimmed});");
4652    let statements = Parser::parse_sql(dialect, &sql).map_err(|err| {
4653        Error::InvalidArgumentError(format!("failed to parse ROW field type '{trimmed}': {err}"))
4654    })?;
4655
4656    let stmt = statements.into_iter().next().ok_or_else(|| {
4657        Error::InvalidArgumentError(format!(
4658            "ROW field type '{trimmed}' did not produce a statement"
4659        ))
4660    })?;
4661
4662    match stmt {
4663        Statement::CreateTable(table) => table
4664            .columns
4665            .first()
4666            .map(|col| col.data_type.clone())
4667            .ok_or_else(|| {
4668                Error::InvalidArgumentError(format!(
4669                    "ROW field type '{trimmed}' missing column definition"
4670                ))
4671            }),
4672        other => Err(Error::InvalidArgumentError(format!(
4673            "unexpected statement while parsing ROW field type: {other:?}"
4674        ))),
4675    }
4676}
4677
4678/// Extract VALUES data from a derived table in FROM clause.
4679/// Returns (rows, column_names) if the pattern matches: SELECT ... FROM (VALUES ...) alias(col1, col2, ...)
4680type ExtractValuesResult = Option<(Vec<Vec<PlanValue>>, Vec<String>)>;
4681
4682#[allow(clippy::type_complexity)]
4683fn extract_values_from_derived_table(from: &[TableWithJoins]) -> SqlResult<ExtractValuesResult> {
4684    if from.len() != 1 {
4685        return Ok(None);
4686    }
4687
4688    let table_with_joins = &from[0];
4689    if !table_with_joins.joins.is_empty() {
4690        return Ok(None);
4691    }
4692
4693    match &table_with_joins.relation {
4694        TableFactor::Derived {
4695            subquery, alias, ..
4696        } => {
4697            // Check if the subquery is a VALUES expression
4698            let values = match subquery.body.as_ref() {
4699                SetExpr::Values(v) => v,
4700                _ => return Ok(None),
4701            };
4702
4703            // Extract column names from alias
4704            let column_names = if let Some(alias) = alias {
4705                alias
4706                    .columns
4707                    .iter()
4708                    .map(|col_def| col_def.name.value.clone())
4709                    .collect::<Vec<_>>()
4710            } else {
4711                // Generate default column names if no alias provided
4712                if values.rows.is_empty() {
4713                    return Err(Error::InvalidArgumentError(
4714                        "VALUES expression must have at least one row".into(),
4715                    ));
4716                }
4717                let first_row = &values.rows[0];
4718                (0..first_row.len())
4719                    .map(|i| format!("column{}", i))
4720                    .collect()
4721            };
4722
4723            // Extract rows
4724            if values.rows.is_empty() {
4725                return Err(Error::InvalidArgumentError(
4726                    "VALUES expression must have at least one row".into(),
4727                ));
4728            }
4729
4730            let mut rows = Vec::with_capacity(values.rows.len());
4731            for row in &values.rows {
4732                if row.len() != column_names.len() {
4733                    return Err(Error::InvalidArgumentError(format!(
4734                        "VALUES row has {} columns but table alias specifies {} columns",
4735                        row.len(),
4736                        column_names.len()
4737                    )));
4738                }
4739
4740                let mut converted_row = Vec::with_capacity(row.len());
4741                for expr in row {
4742                    let value = SqlValue::try_from_expr(expr)?;
4743                    converted_row.push(PlanValue::from(value));
4744                }
4745                rows.push(converted_row);
4746            }
4747
4748            Ok(Some((rows, column_names)))
4749        }
4750        _ => Ok(None),
4751    }
4752}
4753
4754fn extract_constant_select_rows(select: &Select) -> SqlResult<Option<Vec<Vec<PlanValue>>>> {
4755    if !select.from.is_empty() {
4756        return Ok(None);
4757    }
4758
4759    if select.selection.is_some()
4760        || select.having.is_some()
4761        || !select.named_window.is_empty()
4762        || select.qualify.is_some()
4763        || select.distinct.is_some()
4764        || select.top.is_some()
4765        || select.into.is_some()
4766        || select.prewhere.is_some()
4767        || !select.lateral_views.is_empty()
4768        || select.value_table_mode.is_some()
4769        || !group_by_is_empty(&select.group_by)
4770    {
4771        return Err(Error::InvalidArgumentError(
4772            "constant SELECT statements do not support advanced clauses".into(),
4773        ));
4774    }
4775
4776    if select.projection.is_empty() {
4777        return Err(Error::InvalidArgumentError(
4778            "constant SELECT requires at least one projection".into(),
4779        ));
4780    }
4781
4782    let mut row: Vec<PlanValue> = Vec::with_capacity(select.projection.len());
4783    for item in &select.projection {
4784        let expr = match item {
4785            SelectItem::UnnamedExpr(expr) => expr,
4786            SelectItem::ExprWithAlias { expr, .. } => expr,
4787            other => {
4788                return Err(Error::InvalidArgumentError(format!(
4789                    "unsupported projection in constant SELECT: {other:?}"
4790                )));
4791            }
4792        };
4793
4794        let value = SqlValue::try_from_expr(expr)?;
4795        row.push(PlanValue::from(value));
4796    }
4797
4798    Ok(Some(vec![row]))
4799}
4800
4801fn extract_single_table(from: &[TableWithJoins]) -> SqlResult<(String, String)> {
4802    if from.len() != 1 {
4803        return Err(Error::InvalidArgumentError(
4804            "queries over multiple tables are not supported yet".into(),
4805        ));
4806    }
4807    let item = &from[0];
4808    if !item.joins.is_empty() {
4809        return Err(Error::InvalidArgumentError(
4810            "JOIN clauses are not supported yet".into(),
4811        ));
4812    }
4813    match &item.relation {
4814        TableFactor::Table { name, .. } => canonical_object_name(name),
4815        TableFactor::Derived { alias, .. } => {
4816            // Derived table (subquery) - use the alias as the table name if provided
4817            // For CTAS, this allows: CREATE TABLE t AS SELECT * FROM (VALUES ...) v(id)
4818            let table_name = alias
4819                .as_ref()
4820                .map(|a| a.name.value.clone())
4821                .unwrap_or_else(|| "derived".to_string());
4822            let canonical = table_name.to_ascii_lowercase();
4823            Ok((table_name, canonical))
4824        }
4825        _ => Err(Error::InvalidArgumentError(
4826            "queries require a plain table name or derived table".into(),
4827        )),
4828    }
4829}
4830
4831/// Extract multiple tables from FROM clause for cross products
4832fn extract_tables(from: &[TableWithJoins]) -> SqlResult<Vec<llkv_plan::TableRef>> {
4833    let mut tables = Vec::new();
4834
4835    for item in from {
4836        // Check if this table has joins - we don't support JOIN yet
4837        if !item.joins.is_empty() {
4838            return Err(Error::InvalidArgumentError(
4839                "JOIN clauses are not supported yet".into(),
4840            ));
4841        }
4842
4843        // Extract table name
4844        match &item.relation {
4845            TableFactor::Table { name, .. } => {
4846                let (schema_opt, table) = parse_schema_qualified_name(name)?;
4847                let schema = schema_opt.unwrap_or_default();
4848                tables.push(llkv_plan::TableRef::new(schema, table));
4849            }
4850            _ => {
4851                return Err(Error::InvalidArgumentError(
4852                    "queries require a plain table name".into(),
4853                ));
4854            }
4855        }
4856    }
4857
4858    Ok(tables)
4859}
4860
4861fn group_by_is_empty(expr: &GroupByExpr) -> bool {
4862    matches!(
4863        expr,
4864        GroupByExpr::Expressions(exprs, modifiers)
4865            if exprs.is_empty() && modifiers.is_empty()
4866    )
4867}
4868
4869#[cfg(test)]
4870mod tests {
4871    use super::*;
4872    use arrow::array::{Array, Int64Array, StringArray};
4873    use arrow::record_batch::RecordBatch;
4874    use llkv_storage::pager::MemPager;
4875
4876    fn extract_string_options(batches: &[RecordBatch]) -> Vec<Option<String>> {
4877        let mut values: Vec<Option<String>> = Vec::new();
4878        for batch in batches {
4879            let column = batch
4880                .column(0)
4881                .as_any()
4882                .downcast_ref::<StringArray>()
4883                .expect("string column");
4884            for idx in 0..column.len() {
4885                if column.is_null(idx) {
4886                    values.push(None);
4887                } else {
4888                    values.push(Some(column.value(idx).to_string()));
4889                }
4890            }
4891        }
4892        values
4893    }
4894
4895    #[test]
4896    fn create_insert_select_roundtrip() {
4897        let pager = Arc::new(MemPager::default());
4898        let engine = SqlEngine::new(pager);
4899
4900        let result = engine
4901            .execute("CREATE TABLE people (id INT NOT NULL, name TEXT NOT NULL)")
4902            .expect("create table");
4903        assert!(matches!(
4904            result[0],
4905            RuntimeStatementResult::CreateTable { .. }
4906        ));
4907
4908        let result = engine
4909            .execute("INSERT INTO people (id, name) VALUES (1, 'alice'), (2, 'bob')")
4910            .expect("insert rows");
4911        assert!(matches!(
4912            result[0],
4913            RuntimeStatementResult::Insert {
4914                rows_inserted: 2,
4915                ..
4916            }
4917        ));
4918
4919        let mut result = engine
4920            .execute("SELECT name FROM people WHERE id = 2")
4921            .expect("select rows");
4922        let select_result = result.remove(0);
4923        let batches = match select_result {
4924            RuntimeStatementResult::Select { execution, .. } => {
4925                execution.collect().expect("collect batches")
4926            }
4927            _ => panic!("expected select result"),
4928        };
4929        assert_eq!(batches.len(), 1);
4930        let column = batches[0]
4931            .column(0)
4932            .as_any()
4933            .downcast_ref::<StringArray>()
4934            .expect("string column");
4935        assert_eq!(column.len(), 1);
4936        assert_eq!(column.value(0), "bob");
4937    }
4938
4939    #[test]
4940    fn insert_select_constant_including_null() {
4941        let pager = Arc::new(MemPager::default());
4942        let engine = SqlEngine::new(pager);
4943
4944        engine
4945            .execute("CREATE TABLE integers(i INTEGER)")
4946            .expect("create table");
4947
4948        let result = engine
4949            .execute("INSERT INTO integers SELECT 42")
4950            .expect("insert literal");
4951        assert!(matches!(
4952            result[0],
4953            RuntimeStatementResult::Insert {
4954                rows_inserted: 1,
4955                ..
4956            }
4957        ));
4958
4959        let result = engine
4960            .execute("INSERT INTO integers SELECT CAST(NULL AS VARCHAR)")
4961            .expect("insert null literal");
4962        assert!(matches!(
4963            result[0],
4964            RuntimeStatementResult::Insert {
4965                rows_inserted: 1,
4966                ..
4967            }
4968        ));
4969
4970        let mut result = engine
4971            .execute("SELECT * FROM integers")
4972            .expect("select rows");
4973        let select_result = result.remove(0);
4974        let batches = match select_result {
4975            RuntimeStatementResult::Select { execution, .. } => {
4976                execution.collect().expect("collect batches")
4977            }
4978            _ => panic!("expected select result"),
4979        };
4980
4981        let mut values: Vec<Option<i64>> = Vec::new();
4982        for batch in &batches {
4983            let column = batch
4984                .column(0)
4985                .as_any()
4986                .downcast_ref::<Int64Array>()
4987                .expect("int column");
4988            for idx in 0..column.len() {
4989                if column.is_null(idx) {
4990                    values.push(None);
4991                } else {
4992                    values.push(Some(column.value(idx)));
4993                }
4994            }
4995        }
4996
4997        assert_eq!(values, vec![Some(42), None]);
4998    }
4999
5000    #[test]
5001    fn update_with_where_clause_filters_rows() {
5002        let pager = Arc::new(MemPager::default());
5003        let engine = SqlEngine::new(pager);
5004
5005        engine
5006            .execute("SET default_null_order='nulls_first'")
5007            .expect("set default null order");
5008
5009        engine
5010            .execute("CREATE TABLE strings(a VARCHAR)")
5011            .expect("create table");
5012
5013        engine
5014            .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
5015            .expect("insert seed rows");
5016
5017        let result = engine
5018            .execute("UPDATE strings SET a = 13 WHERE a = '3'")
5019            .expect("update rows");
5020        assert!(matches!(
5021            result[0],
5022            RuntimeStatementResult::Update {
5023                rows_updated: 1,
5024                ..
5025            }
5026        ));
5027
5028        let mut result = engine
5029            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5030            .expect("select rows");
5031        let select_result = result.remove(0);
5032        let batches = match select_result {
5033            RuntimeStatementResult::Select { execution, .. } => {
5034                execution.collect().expect("collect batches")
5035            }
5036            _ => panic!("expected select result"),
5037        };
5038
5039        let mut values: Vec<Option<String>> = Vec::new();
5040        for batch in &batches {
5041            let column = batch
5042                .column(0)
5043                .as_any()
5044                .downcast_ref::<StringArray>()
5045                .expect("string column");
5046            for idx in 0..column.len() {
5047                if column.is_null(idx) {
5048                    values.push(None);
5049                } else {
5050                    values.push(Some(column.value(idx).to_string()));
5051                }
5052            }
5053        }
5054
5055        values.sort_by(|a, b| match (a, b) {
5056            (None, None) => std::cmp::Ordering::Equal,
5057            (None, Some(_)) => std::cmp::Ordering::Less,
5058            (Some(_), None) => std::cmp::Ordering::Greater,
5059            (Some(av), Some(bv)) => {
5060                let a_val = av.parse::<i64>().unwrap_or_default();
5061                let b_val = bv.parse::<i64>().unwrap_or_default();
5062                a_val.cmp(&b_val)
5063            }
5064        });
5065
5066        assert_eq!(
5067            values,
5068            vec![None, Some("4".to_string()), Some("13".to_string())]
5069        );
5070    }
5071
5072    #[test]
5073    fn order_by_honors_configured_default_null_order() {
5074        let pager = Arc::new(MemPager::default());
5075        let engine = SqlEngine::new(pager);
5076
5077        engine
5078            .execute("CREATE TABLE strings(a VARCHAR)")
5079            .expect("create table");
5080        engine
5081            .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
5082            .expect("insert values");
5083        engine
5084            .execute("UPDATE strings SET a = 13 WHERE a = '3'")
5085            .expect("update value");
5086
5087        let mut result = engine
5088            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5089            .expect("select rows");
5090        let select_result = result.remove(0);
5091        let batches = match select_result {
5092            RuntimeStatementResult::Select { execution, .. } => {
5093                execution.collect().expect("collect batches")
5094            }
5095            _ => panic!("expected select result"),
5096        };
5097
5098        let values = extract_string_options(&batches);
5099        assert_eq!(
5100            values,
5101            vec![Some("4".to_string()), Some("13".to_string()), None]
5102        );
5103
5104        assert!(!engine.default_nulls_first_for_tests());
5105
5106        engine
5107            .execute("SET default_null_order='nulls_first'")
5108            .expect("set default null order");
5109
5110        assert!(engine.default_nulls_first_for_tests());
5111
5112        let mut result = engine
5113            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5114            .expect("select rows");
5115        let select_result = result.remove(0);
5116        let batches = match select_result {
5117            RuntimeStatementResult::Select { execution, .. } => {
5118                execution.collect().expect("collect batches")
5119            }
5120            _ => panic!("expected select result"),
5121        };
5122
5123        let values = extract_string_options(&batches);
5124        assert_eq!(
5125            values,
5126            vec![None, Some("4".to_string()), Some("13".to_string())]
5127        );
5128    }
5129
5130    #[test]
5131    fn arrow_type_from_row_returns_struct_fields() {
5132        let dialect = GenericDialect {};
5133        let statements = Parser::parse_sql(
5134            &dialect,
5135            "CREATE TABLE row_types(payload ROW(a INTEGER, b VARCHAR));",
5136        )
5137        .expect("parse ROW type definition");
5138
5139        let data_type = match &statements[0] {
5140            Statement::CreateTable(stmt) => stmt.columns[0].data_type.clone(),
5141            other => panic!("unexpected statement: {other:?}"),
5142        };
5143
5144        let arrow_type = arrow_type_from_sql(&data_type).expect("convert ROW type");
5145        match arrow_type {
5146            arrow::datatypes::DataType::Struct(fields) => {
5147                assert_eq!(fields.len(), 2, "unexpected field count");
5148                assert_eq!(fields[0].name(), "a");
5149                assert_eq!(fields[1].name(), "b");
5150                assert_eq!(fields[0].data_type(), &arrow::datatypes::DataType::Int64);
5151                assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
5152            }
5153            other => panic!("expected struct type, got {other:?}"),
5154        }
5155    }
5156}