llkv_sql/
sql_engine.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::{
3    Arc, OnceLock,
4    atomic::{AtomicBool, Ordering as AtomicOrdering},
5};
6
7use crate::SqlResult;
8use crate::SqlValue;
9use arrow::record_batch::RecordBatch;
10
11use llkv_executor::SelectExecution;
12use llkv_expr::literal::Literal;
13use llkv_plan::validation::{
14    ensure_known_columns_case_insensitive, ensure_non_empty, ensure_unique_case_insensitive,
15};
16use llkv_result::Error;
17use llkv_runtime::storage_namespace::TEMPORARY_NAMESPACE_ID;
18use llkv_runtime::{
19    AggregateExpr, AssignmentValue, ColumnAssignment, ColumnSpec, CreateIndexPlan, CreateTablePlan,
20    CreateTableSource, DeletePlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertPlan,
21    InsertSource, OrderByPlan, OrderSortType, OrderTarget, PlanStatement, PlanValue,
22    RuntimeContext, RuntimeEngine, RuntimeSession, RuntimeStatementResult, SelectPlan,
23    SelectProjection, UpdatePlan, extract_rows_from_range,
24};
25use llkv_storage::pager::Pager;
26use llkv_table::catalog::{IdentifierContext, IdentifierResolver};
27use regex::Regex;
28use simd_r_drive_entry_handle::EntryHandle;
29use sqlparser::ast::{
30    Assignment, AssignmentTarget, BeginTransactionKind, BinaryOperator, ColumnOption,
31    ColumnOptionDef, ConstraintCharacteristics, DataType as SqlDataType, Delete, ExceptionWhen,
32    Expr as SqlExpr, FromTable, FunctionArg, FunctionArgExpr, FunctionArguments, GroupByExpr,
33    Ident, LimitClause, NullsDistinctOption, ObjectName, ObjectNamePart, ObjectType, OrderBy,
34    OrderByKind, Query, ReferentialAction, SchemaName, Select, SelectItem,
35    SelectItemQualifiedWildcardKind, Set, SetExpr, SqlOption, Statement, TableConstraint,
36    TableFactor, TableObject, TableWithJoins, TransactionMode, TransactionModifier, UnaryOperator,
37    UpdateTableFromKind, Value, ValueWithSpan,
38};
39use sqlparser::dialect::GenericDialect;
40use sqlparser::parser::Parser;
41
42/// SQL execution engine built on top of the LLKV runtime.
43///
44/// # Examples
45///
46/// ```
47/// use std::sync::Arc;
48///
49/// use arrow::array::StringArray;
50/// use llkv_sql::{RuntimeStatementResult, SqlEngine};
51/// use llkv_storage::pager::MemPager;
52///
53/// let engine = SqlEngine::new(Arc::new(MemPager::default()));
54///
55/// let setup = r#"
56///     CREATE TABLE users (id INT PRIMARY KEY, name TEXT);
57///     INSERT INTO users (id, name) VALUES (1, 'Ada');
58/// "#;
59///
60/// let results = engine.execute(setup).unwrap();
61/// assert_eq!(results.len(), 2);
62///
63/// assert!(matches!(
64///     results[0],
65///     RuntimeStatementResult::CreateTable { ref table_name } if table_name == "users"
66/// ));
67/// assert!(matches!(
68///     results[1],
69///     RuntimeStatementResult::Insert { rows_inserted, .. } if rows_inserted == 1
70/// ));
71///
72/// let batches = engine.sql("SELECT id, name FROM users ORDER BY id;").unwrap();
73/// assert_eq!(batches.len(), 1);
74///
75/// let batch = &batches[0];
76/// assert_eq!(batch.num_rows(), 1);
77/// assert_eq!(batch.schema().field(1).name(), "name");
78///
79/// let names = batch
80///     .column(1)
81///     .as_any()
82///     .downcast_ref::<StringArray>()
83///     .unwrap();
84///
85/// assert_eq!(names.value(0), "Ada");
86/// ```
87pub struct SqlEngine<P>
88where
89    P: Pager<Blob = EntryHandle> + Send + Sync,
90{
91    engine: RuntimeEngine<P>,
92    default_nulls_first: AtomicBool,
93}
94
95const DROPPED_TABLE_TRANSACTION_ERR: &str = "another transaction has dropped this table";
96
97impl<P> Clone for SqlEngine<P>
98where
99    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
100{
101    fn clone(&self) -> Self {
102        tracing::warn!(
103            "[SQL_ENGINE] SqlEngine::clone() called - will create new Engine with new session!"
104        );
105        // Create a new session from the same context
106        Self {
107            engine: self.engine.clone(),
108            default_nulls_first: AtomicBool::new(
109                self.default_nulls_first.load(AtomicOrdering::Relaxed),
110            ),
111        }
112    }
113}
114
115#[allow(dead_code)]
116impl<P> SqlEngine<P>
117where
118    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
119{
120    fn map_table_error(table_name: &str, err: Error) -> Error {
121        match err {
122            Error::NotFound => Self::table_not_found_error(table_name),
123            Error::InvalidArgumentError(msg) if msg.contains("unknown table") => {
124                Self::table_not_found_error(table_name)
125            }
126            other => other,
127        }
128    }
129
130    fn table_not_found_error(table_name: &str) -> Error {
131        Error::CatalogError(format!(
132            "Catalog Error: Table '{table_name}' does not exist"
133        ))
134    }
135
136    fn is_table_missing_error(err: &Error) -> bool {
137        match err {
138            Error::NotFound => true,
139            Error::CatalogError(msg) => {
140                msg.contains("Catalog Error: Table") || msg.contains("unknown table")
141            }
142            Error::InvalidArgumentError(msg) => {
143                msg.contains("Catalog Error: Table") || msg.contains("unknown table")
144            }
145            _ => false,
146        }
147    }
148
149    fn execute_plan_statement(
150        &self,
151        statement: PlanStatement,
152    ) -> SqlResult<RuntimeStatementResult<P>> {
153        let table = llkv_runtime::statement_table_name(&statement).map(str::to_string);
154        self.engine.execute_statement(statement).map_err(|err| {
155            if let Some(table_name) = table {
156                Self::map_table_error(&table_name, err)
157            } else {
158                err
159            }
160        })
161    }
162
163    pub fn new(pager: Arc<P>) -> Self {
164        let engine = RuntimeEngine::new(pager);
165        Self {
166            engine,
167            default_nulls_first: AtomicBool::new(false),
168        }
169    }
170
171    /// Preprocess SQL to handle qualified names in EXCLUDE clauses
172    /// Converts EXCLUDE (schema.table.col) to EXCLUDE ("schema.table.col")
173    fn preprocess_exclude_syntax(sql: &str) -> String {
174        static EXCLUDE_REGEX: OnceLock<Regex> = OnceLock::new();
175
176        // Pattern to match EXCLUDE followed by qualified identifiers
177        // Matches: EXCLUDE (identifier.identifier.identifier)
178        let re = EXCLUDE_REGEX.get_or_init(|| {
179            Regex::new(
180                r"(?i)EXCLUDE\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)+)\s*\)",
181            )
182            .expect("valid EXCLUDE qualifier regex")
183        });
184
185        re.replace_all(sql, |caps: &regex::Captures| {
186            let qualified_name = &caps[1];
187            format!("EXCLUDE (\"{}\")", qualified_name)
188        })
189        .to_string()
190    }
191
192    pub(crate) fn context_arc(&self) -> Arc<RuntimeContext<P>> {
193        self.engine.context()
194    }
195
196    pub fn with_context(context: Arc<RuntimeContext<P>>, default_nulls_first: bool) -> Self {
197        Self {
198            engine: RuntimeEngine::from_context(context),
199            default_nulls_first: AtomicBool::new(default_nulls_first),
200        }
201    }
202
203    #[cfg(test)]
204    fn default_nulls_first_for_tests(&self) -> bool {
205        self.default_nulls_first.load(AtomicOrdering::Relaxed)
206    }
207
208    fn has_active_transaction(&self) -> bool {
209        self.engine.session().has_active_transaction()
210    }
211
212    /// Get a reference to the underlying session (for advanced use like error handling in test harnesses).
213    pub fn session(&self) -> &RuntimeSession<P> {
214        self.engine.session()
215    }
216
217    /// Execute one or more SQL statements and return their raw [`RuntimeStatementResult`]s.
218    ///
219    /// This method is the general-purpose entry point for running SQL against the engine when
220    /// you need to mix statement types (e.g. `CREATE TABLE`, `INSERT`, `UPDATE`, `SELECT`) or
221    /// when you care about per-statement status information. Statements are executed in the order
222    /// they appear in the input string, and the results vector mirrors that ordering.
223    ///
224    /// For ad-hoc read queries where you only care about the resulting Arrow [`RecordBatch`]es,
225    /// prefer [`SqlEngine::sql`], which enforces a single `SELECT` statement and collects its
226    /// output for you. `execute` remains the right tool for schema migrations, transactional
227    /// scripts, or workflows that need to inspect the specific runtime response for each
228    /// statement.
229    pub fn execute(&self, sql: &str) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
230        tracing::trace!("DEBUG SQL execute: {}", sql);
231
232        // Preprocess SQL to handle qualified names in EXCLUDE
233        // Replace EXCLUDE (schema.table.col) with EXCLUDE ("schema.table.col")
234        let processed_sql = Self::preprocess_exclude_syntax(sql);
235
236        let dialect = GenericDialect {};
237        let statements = Parser::parse_sql(&dialect, &processed_sql)
238            .map_err(|err| Error::InvalidArgumentError(format!("failed to parse SQL: {err}")))?;
239        tracing::trace!("DEBUG SQL execute: parsed {} statements", statements.len());
240
241        let mut results = Vec::with_capacity(statements.len());
242        for (i, statement) in statements.iter().enumerate() {
243            tracing::trace!("DEBUG SQL execute: processing statement {}", i);
244            results.push(self.execute_statement(statement.clone())?);
245            tracing::trace!("DEBUG SQL execute: statement {} completed", i);
246        }
247        tracing::trace!("DEBUG SQL execute completed successfully");
248        Ok(results)
249    }
250
251    /// Execute a single SELECT statement and return its results as Arrow [`RecordBatch`]es.
252    ///
253    /// The SQL passed to this method must contain exactly one statement, and that statement must
254    /// be a `SELECT`. Statements that modify data (e.g. `INSERT`) should be executed up front
255    /// using [`SqlEngine::execute`] before calling this helper.
256    ///
257    /// # Examples
258    ///
259    /// ```
260    /// use std::sync::Arc;
261    ///
262    /// use arrow::array::StringArray;
263    /// use llkv_sql::SqlEngine;
264    /// use llkv_storage::pager::MemPager;
265    ///
266    /// let engine = SqlEngine::new(Arc::new(MemPager::default()));
267    /// let _ = engine
268    ///     .execute(
269    ///         "CREATE TABLE users (id INT PRIMARY KEY, name TEXT);\n         \
270    ///          INSERT INTO users (id, name) VALUES (1, 'Ada');",
271    ///     )
272    ///     .unwrap();
273    ///
274    /// let batches = engine.sql("SELECT id, name FROM users ORDER BY id;").unwrap();
275    /// assert_eq!(batches.len(), 1);
276    ///
277    /// let batch = &batches[0];
278    /// assert_eq!(batch.num_rows(), 1);
279    ///
280    /// let names = batch
281    ///     .column(1)
282    ///     .as_any()
283    ///     .downcast_ref::<StringArray>()
284    ///     .unwrap();
285    /// assert_eq!(names.value(0), "Ada");
286    /// ```
287    pub fn sql(&self, sql: &str) -> SqlResult<Vec<RecordBatch>> {
288        let mut results = self.execute(sql)?;
289        if results.len() != 1 {
290            return Err(Error::InvalidArgumentError(
291                "SqlEngine::sql expects exactly one SQL statement".into(),
292            ));
293        }
294
295        match results.pop().expect("checked length above") {
296            RuntimeStatementResult::Select { execution, .. } => execution.collect(),
297            other => Err(Error::InvalidArgumentError(format!(
298                "SqlEngine::sql requires a SELECT statement, got {other:?}",
299            ))),
300        }
301    }
302
303    fn execute_statement(&self, statement: Statement) -> SqlResult<RuntimeStatementResult<P>> {
304        tracing::trace!(
305            "DEBUG SQL execute_statement: {:?}",
306            match &statement {
307                Statement::Insert(insert) =>
308                    format!("Insert(table={:?})", Self::table_name_from_insert(insert)),
309                Statement::Query(_) => "Query".to_string(),
310                Statement::StartTransaction { .. } => "StartTransaction".to_string(),
311                Statement::Commit { .. } => "Commit".to_string(),
312                Statement::Rollback { .. } => "Rollback".to_string(),
313                Statement::CreateTable(_) => "CreateTable".to_string(),
314                Statement::Update { .. } => "Update".to_string(),
315                Statement::Delete(_) => "Delete".to_string(),
316                other => format!("Other({:?})", other),
317            }
318        );
319        match statement {
320            Statement::StartTransaction {
321                modes,
322                begin,
323                transaction,
324                modifier,
325                statements,
326                exception,
327                has_end_keyword,
328            } => self.handle_start_transaction(
329                modes,
330                begin,
331                transaction,
332                modifier,
333                statements,
334                exception,
335                has_end_keyword,
336            ),
337            Statement::Commit {
338                chain,
339                end,
340                modifier,
341            } => self.handle_commit(chain, end, modifier),
342            Statement::Rollback { chain, savepoint } => self.handle_rollback(chain, savepoint),
343            other => self.execute_statement_non_transactional(other),
344        }
345    }
346
347    fn execute_statement_non_transactional(
348        &self,
349        statement: Statement,
350    ) -> SqlResult<RuntimeStatementResult<P>> {
351        tracing::trace!("DEBUG SQL execute_statement_non_transactional called");
352        match statement {
353            Statement::CreateTable(stmt) => {
354                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateTable");
355                self.handle_create_table(stmt)
356            }
357            Statement::CreateIndex(stmt) => {
358                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateIndex");
359                self.handle_create_index(stmt)
360            }
361            Statement::CreateSchema {
362                schema_name,
363                if_not_exists,
364                with,
365                options,
366                default_collate_spec,
367                clone,
368            } => {
369                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateSchema");
370                self.handle_create_schema(
371                    schema_name,
372                    if_not_exists,
373                    with,
374                    options,
375                    default_collate_spec,
376                    clone,
377                )
378            }
379            Statement::Insert(stmt) => {
380                let table_name =
381                    Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
382                tracing::trace!(
383                    "DEBUG SQL execute_statement_non_transactional: Insert(table={})",
384                    table_name
385                );
386                self.handle_insert(stmt)
387            }
388            Statement::Query(query) => {
389                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Query");
390                self.handle_query(*query)
391            }
392            Statement::Update {
393                table,
394                assignments,
395                from,
396                selection,
397                returning,
398                ..
399            } => {
400                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Update");
401                self.handle_update(table, assignments, from, selection, returning)
402            }
403            Statement::Delete(delete) => {
404                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Delete");
405                self.handle_delete(delete)
406            }
407            Statement::Drop {
408                object_type,
409                if_exists,
410                names,
411                cascade,
412                restrict,
413                purge,
414                temporary,
415                ..
416            } => {
417                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Drop");
418                self.handle_drop(
419                    object_type,
420                    if_exists,
421                    names,
422                    cascade,
423                    restrict,
424                    purge,
425                    temporary,
426                )
427            }
428            Statement::Set(set_stmt) => {
429                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Set");
430                self.handle_set(set_stmt)
431            }
432            Statement::Pragma { name, value, is_eq } => {
433                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Pragma");
434                self.handle_pragma(name, value, is_eq)
435            }
436            other => {
437                tracing::trace!(
438                    "DEBUG SQL execute_statement_non_transactional: Other({:?})",
439                    other
440                );
441                Err(Error::InvalidArgumentError(format!(
442                    "unsupported SQL statement: {other:?}"
443                )))
444            }
445        }
446    }
447
448    fn table_name_from_insert(insert: &sqlparser::ast::Insert) -> SqlResult<String> {
449        match &insert.table {
450            TableObject::TableName(name) => Self::object_name_to_string(name),
451            _ => Err(Error::InvalidArgumentError(
452                "INSERT requires a plain table name".into(),
453            )),
454        }
455    }
456
457    fn table_name_from_update(table: &TableWithJoins) -> SqlResult<Option<String>> {
458        if !table.joins.is_empty() {
459            return Err(Error::InvalidArgumentError(
460                "UPDATE with JOIN targets is not supported yet".into(),
461            ));
462        }
463        Self::table_with_joins_name(table)
464    }
465
466    fn table_name_from_delete(delete: &Delete) -> SqlResult<Option<String>> {
467        if !delete.tables.is_empty() {
468            return Err(Error::InvalidArgumentError(
469                "multi-table DELETE is not supported yet".into(),
470            ));
471        }
472        let from_tables = match &delete.from {
473            FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
474        };
475        if from_tables.is_empty() {
476            return Ok(None);
477        }
478        if from_tables.len() != 1 {
479            return Err(Error::InvalidArgumentError(
480                "DELETE over multiple tables is not supported yet".into(),
481            ));
482        }
483        Self::table_with_joins_name(&from_tables[0])
484    }
485
486    fn object_name_to_string(name: &ObjectName) -> SqlResult<String> {
487        let (display, _) = canonical_object_name(name)?;
488        Ok(display)
489    }
490
491    #[allow(dead_code)]
492    fn table_object_to_name(table: &TableObject) -> SqlResult<Option<String>> {
493        match table {
494            TableObject::TableName(name) => Ok(Some(Self::object_name_to_string(name)?)),
495            TableObject::TableFunction(_) => Ok(None),
496        }
497    }
498
499    fn table_with_joins_name(table: &TableWithJoins) -> SqlResult<Option<String>> {
500        match &table.relation {
501            TableFactor::Table { name, .. } => Ok(Some(Self::object_name_to_string(name)?)),
502            _ => Ok(None),
503        }
504    }
505
506    fn tables_in_query(query: &Query) -> SqlResult<Vec<String>> {
507        let mut tables = Vec::new();
508        if let sqlparser::ast::SetExpr::Select(select) = query.body.as_ref() {
509            for table in &select.from {
510                if let TableFactor::Table { name, .. } = &table.relation {
511                    tables.push(Self::object_name_to_string(name)?);
512                }
513            }
514        }
515        Ok(tables)
516    }
517
518    fn collect_known_columns(
519        &self,
520        display_name: &str,
521        canonical_name: &str,
522    ) -> SqlResult<HashSet<String>> {
523        let context = self.engine.context();
524
525        if context.is_table_marked_dropped(canonical_name) {
526            return Err(Self::table_not_found_error(display_name));
527        }
528
529        match context.table_column_specs(display_name) {
530            Ok(specs) => Ok(specs
531                .into_iter()
532                .map(|spec| spec.name.to_ascii_lowercase())
533                .collect()),
534            Err(err) => {
535                if !Self::is_table_missing_error(&err) {
536                    return Err(Self::map_table_error(display_name, err));
537                }
538
539                Ok(HashSet::new())
540            }
541        }
542    }
543
544    fn is_table_marked_dropped(&self, table_name: &str) -> SqlResult<bool> {
545        let canonical = table_name.to_ascii_lowercase();
546        Ok(self.engine.context().is_table_marked_dropped(&canonical))
547    }
548
549    fn handle_create_table(
550        &self,
551        mut stmt: sqlparser::ast::CreateTable,
552    ) -> SqlResult<RuntimeStatementResult<P>> {
553        validate_create_table_common(&stmt)?;
554
555        let (mut schema_name, table_name) = parse_schema_qualified_name(&stmt.name)?;
556
557        let namespace = if stmt.temporary {
558            if schema_name.is_some() {
559                return Err(Error::InvalidArgumentError(
560                    "temporary tables cannot specify an explicit schema".into(),
561                ));
562            }
563            schema_name = None;
564            Some(TEMPORARY_NAMESPACE_ID.to_string())
565        } else {
566            None
567        };
568
569        // Validate schema exists if specified
570        if let Some(ref schema) = schema_name {
571            let catalog = self.engine.context().table_catalog();
572            if !catalog.schema_exists(schema) {
573                return Err(Error::CatalogError(format!(
574                    "Schema '{}' does not exist",
575                    schema
576                )));
577            }
578        }
579
580        // Use full qualified name (schema.table or just table)
581        let display_name = match &schema_name {
582            Some(schema) => format!("{}.{}", schema, table_name),
583            None => table_name.clone(),
584        };
585        let canonical_name = display_name.to_ascii_lowercase();
586        tracing::trace!(
587            "\n=== HANDLE_CREATE_TABLE: table='{}' columns={} ===",
588            display_name,
589            stmt.columns.len()
590        );
591        if display_name.is_empty() {
592            return Err(Error::InvalidArgumentError(
593                "table name must not be empty".into(),
594            ));
595        }
596
597        if let Some(query) = stmt.query.take() {
598            validate_create_table_as(&stmt)?;
599            if let Some(result) = self.try_handle_range_ctas(
600                &display_name,
601                &canonical_name,
602                &query,
603                stmt.if_not_exists,
604                stmt.or_replace,
605                namespace.clone(),
606            )? {
607                return Ok(result);
608            }
609            return self.handle_create_table_as(
610                display_name,
611                canonical_name,
612                *query,
613                stmt.if_not_exists,
614                stmt.or_replace,
615                namespace.clone(),
616            );
617        }
618
619        if stmt.columns.is_empty() {
620            return Err(Error::InvalidArgumentError(
621                "CREATE TABLE requires at least one column".into(),
622            ));
623        }
624
625        validate_create_table_definition(&stmt)?;
626
627        let column_defs_ast = std::mem::take(&mut stmt.columns);
628        let constraints = std::mem::take(&mut stmt.constraints);
629
630        let column_names: Vec<String> = column_defs_ast
631            .iter()
632            .map(|column_def| column_def.name.value.clone())
633            .collect();
634        ensure_unique_case_insensitive(column_names.iter().map(|name| name.as_str()), |dup| {
635            format!(
636                "duplicate column name '{}' in table '{}'",
637                dup, display_name
638            )
639        })?;
640        let column_names_lower: HashSet<String> = column_names
641            .iter()
642            .map(|name| name.to_ascii_lowercase())
643            .collect();
644
645        let mut columns: Vec<ColumnSpec> = Vec::with_capacity(column_defs_ast.len());
646        let mut primary_key_columns: HashSet<String> = HashSet::new();
647        let mut foreign_keys: Vec<ForeignKeySpec> = Vec::new();
648
649        // Second pass: process columns including CHECK validation and column-level FKs
650        for column_def in column_defs_ast {
651            let is_nullable = column_def
652                .options
653                .iter()
654                .all(|opt| !matches!(opt.option, ColumnOption::NotNull));
655
656            let is_primary_key = column_def.options.iter().any(|opt| {
657                matches!(
658                    opt.option,
659                    ColumnOption::Unique {
660                        is_primary: true,
661                        characteristics: _
662                    }
663                )
664            });
665
666            let has_unique_constraint = column_def
667                .options
668                .iter()
669                .any(|opt| matches!(opt.option, ColumnOption::Unique { .. }));
670
671            // Extract CHECK constraint if present and validate it
672            let check_expr = column_def.options.iter().find_map(|opt| {
673                if let ColumnOption::Check(expr) = &opt.option {
674                    Some(expr)
675                } else {
676                    None
677                }
678            });
679
680            // Validate CHECK constraint if present (now we have all column names)
681            if let Some(check_expr) = check_expr {
682                let all_col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
683                validate_check_constraint(check_expr, &display_name, &all_col_refs)?;
684            }
685
686            let check_expr_str = check_expr.map(|e| e.to_string());
687
688            // Extract column-level FOREIGN KEY (REFERENCES clause)
689            for opt in &column_def.options {
690                if let ColumnOption::ForeignKey {
691                    foreign_table,
692                    referred_columns,
693                    on_delete,
694                    on_update,
695                    characteristics,
696                } = &opt.option
697                {
698                    let spec = self.build_foreign_key_spec(
699                        &display_name,
700                        &canonical_name,
701                        vec![column_def.name.value.clone()],
702                        foreign_table,
703                        referred_columns,
704                        *on_delete,
705                        *on_update,
706                        characteristics,
707                        &column_names_lower,
708                        None,
709                    )?;
710                    foreign_keys.push(spec);
711                }
712            }
713
714            tracing::trace!(
715                "DEBUG CREATE TABLE column '{}' is_primary_key={} has_unique={} check_expr={:?}",
716                column_def.name.value,
717                is_primary_key,
718                has_unique_constraint,
719                check_expr_str
720            );
721
722            let mut column = ColumnSpec::new(
723                column_def.name.value.clone(),
724                arrow_type_from_sql(&column_def.data_type)?,
725                is_nullable,
726            );
727            tracing::trace!(
728                "DEBUG ColumnSpec after new(): primary_key={} unique={}",
729                column.primary_key,
730                column.unique
731            );
732
733            column = column
734                .with_primary_key(is_primary_key)
735                .with_unique(has_unique_constraint)
736                .with_check(check_expr_str);
737
738            if is_primary_key {
739                column.nullable = false;
740                primary_key_columns.insert(column.name.to_ascii_lowercase());
741            }
742            tracing::trace!(
743                "DEBUG ColumnSpec after with_primary_key({})/with_unique({}): primary_key={} unique={} check_expr={:?}",
744                is_primary_key,
745                has_unique_constraint,
746                column.primary_key,
747                column.unique,
748                column.check_expr
749            );
750
751            columns.push(column);
752        }
753
754        // Apply supported table-level constraints (e.g., PRIMARY KEY)
755        if !constraints.is_empty() {
756            let mut column_lookup: HashMap<String, usize> = HashMap::with_capacity(columns.len());
757            for (idx, column) in columns.iter().enumerate() {
758                column_lookup.insert(column.name.to_ascii_lowercase(), idx);
759            }
760
761            for constraint in constraints {
762                match constraint {
763                    TableConstraint::PrimaryKey {
764                        columns: constraint_columns,
765                        ..
766                    } => {
767                        if !primary_key_columns.is_empty() {
768                            return Err(Error::InvalidArgumentError(
769                                "multiple PRIMARY KEY constraints are not supported".into(),
770                            ));
771                        }
772
773                        ensure_non_empty(&constraint_columns, || {
774                            "PRIMARY KEY requires at least one column".into()
775                        })?;
776
777                        let mut pk_column_names: Vec<String> =
778                            Vec::with_capacity(constraint_columns.len());
779
780                        for index_col in &constraint_columns {
781                            let column_ident = extract_index_column_name(
782                                index_col,
783                                "PRIMARY KEY",
784                                false, // no sort options allowed
785                                false, // only simple identifiers
786                            )?;
787                            pk_column_names.push(column_ident);
788                        }
789
790                        ensure_unique_case_insensitive(
791                            pk_column_names.iter().map(|name| name.as_str()),
792                            |dup| format!("duplicate column '{}' in PRIMARY KEY constraint", dup),
793                        )?;
794
795                        ensure_known_columns_case_insensitive(
796                            pk_column_names.iter().map(|name| name.as_str()),
797                            &column_names_lower,
798                            |unknown| {
799                                format!("unknown column '{}' in PRIMARY KEY constraint", unknown)
800                            },
801                        )?;
802
803                        for column_ident in pk_column_names {
804                            let normalized = column_ident.to_ascii_lowercase();
805                            let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
806                                Error::InvalidArgumentError(format!(
807                                    "unknown column '{}' in PRIMARY KEY constraint",
808                                    column_ident
809                                ))
810                            })?;
811
812                            let column = columns.get_mut(idx).expect("column index valid");
813                            column.primary_key = true;
814                            column.unique = true;
815                            column.nullable = false;
816
817                            primary_key_columns.insert(normalized);
818                        }
819                    }
820                    TableConstraint::Unique {
821                        columns: constraint_columns,
822                        index_type,
823                        index_options,
824                        characteristics,
825                        nulls_distinct,
826                        ..
827                    } => {
828                        if !matches!(nulls_distinct, NullsDistinctOption::None) {
829                            return Err(Error::InvalidArgumentError(
830                                "UNIQUE constraints with NULLS DISTINCT/NOT DISTINCT are not supported yet".into(),
831                            ));
832                        }
833
834                        if index_type.is_some() {
835                            return Err(Error::InvalidArgumentError(
836                                "UNIQUE constraints with index types are not supported yet".into(),
837                            ));
838                        }
839
840                        if !index_options.is_empty() {
841                            return Err(Error::InvalidArgumentError(
842                                "UNIQUE constraints with index options are not supported yet"
843                                    .into(),
844                            ));
845                        }
846
847                        if characteristics.is_some() {
848                            return Err(Error::InvalidArgumentError(
849                                "UNIQUE constraint characteristics are not supported yet".into(),
850                            ));
851                        }
852
853                        ensure_non_empty(&constraint_columns, || {
854                            "UNIQUE constraint requires at least one column".into()
855                        })?;
856
857                        let mut unique_column_names: Vec<String> =
858                            Vec::with_capacity(constraint_columns.len());
859
860                        for index_column in &constraint_columns {
861                            let column_ident = extract_index_column_name(
862                                index_column,
863                                "UNIQUE constraint",
864                                false, // no sort options allowed
865                                false, // only simple identifiers
866                            )?;
867                            unique_column_names.push(column_ident);
868                        }
869
870                        if unique_column_names.len() > 1 {
871                            return Err(Error::InvalidArgumentError(
872                                "multi-column UNIQUE constraints are not supported yet".into(),
873                            ));
874                        }
875
876                        ensure_unique_case_insensitive(
877                            unique_column_names.iter().map(|name| name.as_str()),
878                            |dup| format!("duplicate column '{}' in UNIQUE constraint", dup),
879                        )?;
880
881                        ensure_known_columns_case_insensitive(
882                            unique_column_names.iter().map(|name| name.as_str()),
883                            &column_names_lower,
884                            |unknown| format!("unknown column '{}' in UNIQUE constraint", unknown),
885                        )?;
886
887                        let column_ident = unique_column_names
888                            .into_iter()
889                            .next()
890                            .expect("unique constraint checked for emptiness");
891                        let normalized = column_ident.to_ascii_lowercase();
892                        let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
893                            Error::InvalidArgumentError(format!(
894                                "unknown column '{}' in UNIQUE constraint",
895                                column_ident
896                            ))
897                        })?;
898
899                        let column = columns
900                            .get_mut(idx)
901                            .expect("column index from lookup must be valid");
902                        column.unique = true;
903                    }
904                    TableConstraint::ForeignKey {
905                        name,
906                        index_name,
907                        columns: fk_columns,
908                        foreign_table,
909                        referred_columns,
910                        on_delete,
911                        on_update,
912                        characteristics,
913                        ..
914                    } => {
915                        if index_name.is_some() {
916                            return Err(Error::InvalidArgumentError(
917                                "FOREIGN KEY index clauses are not supported yet".into(),
918                            ));
919                        }
920
921                        let referencing_columns: Vec<String> =
922                            fk_columns.into_iter().map(|ident| ident.value).collect();
923                        let spec = self.build_foreign_key_spec(
924                            &display_name,
925                            &canonical_name,
926                            referencing_columns,
927                            &foreign_table,
928                            &referred_columns,
929                            on_delete,
930                            on_update,
931                            &characteristics,
932                            &column_names_lower,
933                            name.map(|ident| ident.value),
934                        )?;
935
936                        foreign_keys.push(spec);
937                    }
938                    unsupported => {
939                        return Err(Error::InvalidArgumentError(format!(
940                            "table-level constraint {:?} is not supported",
941                            unsupported
942                        )));
943                    }
944                }
945            }
946        }
947
948        let plan = CreateTablePlan {
949            name: display_name,
950            if_not_exists: stmt.if_not_exists,
951            or_replace: stmt.or_replace,
952            columns,
953            source: None,
954            namespace,
955            foreign_keys,
956        };
957        self.execute_plan_statement(PlanStatement::CreateTable(plan))
958    }
959
960    fn handle_create_index(
961        &self,
962        stmt: sqlparser::ast::CreateIndex,
963    ) -> SqlResult<RuntimeStatementResult<P>> {
964        let sqlparser::ast::CreateIndex {
965            name,
966            table_name,
967            using,
968            columns,
969            unique,
970            concurrently,
971            if_not_exists,
972            include,
973            nulls_distinct,
974            with,
975            predicate,
976            index_options,
977            alter_options,
978            ..
979        } = stmt;
980
981        if concurrently {
982            return Err(Error::InvalidArgumentError(
983                "CREATE INDEX CONCURRENTLY is not supported".into(),
984            ));
985        }
986        if using.is_some() {
987            return Err(Error::InvalidArgumentError(
988                "CREATE INDEX USING clauses are not supported".into(),
989            ));
990        }
991        if !include.is_empty() {
992            return Err(Error::InvalidArgumentError(
993                "CREATE INDEX INCLUDE columns are not supported".into(),
994            ));
995        }
996        if nulls_distinct.is_some() {
997            return Err(Error::InvalidArgumentError(
998                "CREATE INDEX NULLS DISTINCT is not supported".into(),
999            ));
1000        }
1001        if !with.is_empty() {
1002            return Err(Error::InvalidArgumentError(
1003                "CREATE INDEX WITH options are not supported".into(),
1004            ));
1005        }
1006        if predicate.is_some() {
1007            return Err(Error::InvalidArgumentError(
1008                "partial CREATE INDEX is not supported".into(),
1009            ));
1010        }
1011        if !index_options.is_empty() {
1012            return Err(Error::InvalidArgumentError(
1013                "CREATE INDEX options are not supported".into(),
1014            ));
1015        }
1016        if !alter_options.is_empty() {
1017            return Err(Error::InvalidArgumentError(
1018                "CREATE INDEX ALTER options are not supported".into(),
1019            ));
1020        }
1021        if columns.is_empty() {
1022            return Err(Error::InvalidArgumentError(
1023                "CREATE INDEX requires at least one column".into(),
1024            ));
1025        }
1026
1027        let (schema_name, base_table_name) = parse_schema_qualified_name(&table_name)?;
1028        if let Some(ref schema) = schema_name {
1029            let catalog = self.engine.context().table_catalog();
1030            if !catalog.schema_exists(schema) {
1031                return Err(Error::CatalogError(format!(
1032                    "Schema '{}' does not exist",
1033                    schema
1034                )));
1035            }
1036        }
1037
1038        let display_table_name = schema_name
1039            .as_ref()
1040            .map(|schema| format!("{}.{}", schema, base_table_name))
1041            .unwrap_or_else(|| base_table_name.clone());
1042        let canonical_table_name = display_table_name.to_ascii_lowercase();
1043
1044        let known_columns =
1045            self.collect_known_columns(&display_table_name, &canonical_table_name)?;
1046        let enforce_known_columns = !known_columns.is_empty();
1047
1048        let index_name = match name {
1049            Some(name_obj) => Some(Self::object_name_to_string(&name_obj)?),
1050            None => None,
1051        };
1052
1053        let mut index_columns: Vec<IndexColumnPlan> = Vec::with_capacity(columns.len());
1054        let mut seen_column_names: HashSet<String> = HashSet::new();
1055        for item in columns {
1056            // Check WITH FILL before calling helper (not part of standard validation)
1057            if item.column.with_fill.is_some() {
1058                return Err(Error::InvalidArgumentError(
1059                    "CREATE INDEX column WITH FILL is not supported".into(),
1060                ));
1061            }
1062
1063            let column_name = extract_index_column_name(
1064                &item,
1065                "CREATE INDEX",
1066                true, // allow and validate sort options
1067                true, // allow compound identifiers
1068            )?;
1069
1070            // Get sort options (already validated by helper)
1071            let order_expr = &item.column;
1072            let ascending = order_expr.options.asc.unwrap_or(true);
1073            let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
1074
1075            let normalized = column_name.to_ascii_lowercase();
1076            if !seen_column_names.insert(normalized.clone()) {
1077                return Err(Error::InvalidArgumentError(format!(
1078                    "duplicate column '{}' in CREATE INDEX",
1079                    column_name
1080                )));
1081            }
1082
1083            if enforce_known_columns && !known_columns.contains(&normalized) {
1084                return Err(Error::InvalidArgumentError(format!(
1085                    "column '{}' does not exist in table '{}'",
1086                    column_name, display_table_name
1087                )));
1088            }
1089
1090            let column_plan = IndexColumnPlan::new(column_name).with_sort(ascending, nulls_first);
1091            index_columns.push(column_plan);
1092        }
1093
1094        if index_columns.len() > 1 && !unique {
1095            return Err(Error::InvalidArgumentError(
1096                "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
1097            ));
1098        }
1099
1100        let plan = CreateIndexPlan::new(display_table_name)
1101            .with_name(index_name)
1102            .with_unique(unique)
1103            .with_if_not_exists(if_not_exists)
1104            .with_columns(index_columns);
1105
1106        self.execute_plan_statement(PlanStatement::CreateIndex(plan))
1107    }
1108
1109    fn map_referential_action(
1110        action: Option<ReferentialAction>,
1111        kind: &str,
1112    ) -> SqlResult<ForeignKeyAction> {
1113        match action {
1114            None | Some(ReferentialAction::NoAction) => Ok(ForeignKeyAction::NoAction),
1115            Some(ReferentialAction::Restrict) => Ok(ForeignKeyAction::Restrict),
1116            Some(other) => Err(Error::InvalidArgumentError(format!(
1117                "FOREIGN KEY ON {kind} {:?} is not supported yet",
1118                other
1119            ))),
1120        }
1121    }
1122
1123    #[allow(clippy::too_many_arguments)]
1124    fn build_foreign_key_spec(
1125        &self,
1126        _referencing_display: &str,
1127        referencing_canonical: &str,
1128        referencing_columns: Vec<String>,
1129        foreign_table: &ObjectName,
1130        referenced_columns: &[Ident],
1131        on_delete: Option<ReferentialAction>,
1132        on_update: Option<ReferentialAction>,
1133        characteristics: &Option<ConstraintCharacteristics>,
1134        known_columns_lower: &HashSet<String>,
1135        name: Option<String>,
1136    ) -> SqlResult<ForeignKeySpec> {
1137        if characteristics.is_some() {
1138            return Err(Error::InvalidArgumentError(
1139                "FOREIGN KEY constraint characteristics are not supported yet".into(),
1140            ));
1141        }
1142
1143        ensure_non_empty(&referencing_columns, || {
1144            "FOREIGN KEY constraint requires at least one referencing column".into()
1145        })?;
1146        ensure_unique_case_insensitive(
1147            referencing_columns.iter().map(|name| name.as_str()),
1148            |dup| format!("duplicate column '{}' in FOREIGN KEY constraint", dup),
1149        )?;
1150        ensure_known_columns_case_insensitive(
1151            referencing_columns.iter().map(|name| name.as_str()),
1152            known_columns_lower,
1153            |unknown| format!("unknown column '{}' in FOREIGN KEY constraint", unknown),
1154        )?;
1155
1156        let referenced_columns_vec: Vec<String> = referenced_columns
1157            .iter()
1158            .map(|ident| ident.value.clone())
1159            .collect();
1160        ensure_unique_case_insensitive(
1161            referenced_columns_vec.iter().map(|name| name.as_str()),
1162            |dup| {
1163                format!(
1164                    "duplicate referenced column '{}' in FOREIGN KEY constraint",
1165                    dup
1166                )
1167            },
1168        )?;
1169
1170        if !referenced_columns_vec.is_empty()
1171            && referenced_columns_vec.len() != referencing_columns.len()
1172        {
1173            return Err(Error::InvalidArgumentError(
1174                "FOREIGN KEY referencing and referenced column counts must match".into(),
1175            ));
1176        }
1177
1178        let (referenced_display, referenced_canonical) = canonical_object_name(foreign_table)?;
1179
1180        if referenced_canonical == referencing_canonical {
1181            ensure_known_columns_case_insensitive(
1182                referenced_columns_vec.iter().map(|name| name.as_str()),
1183                known_columns_lower,
1184                |unknown| {
1185                    format!(
1186                        "unknown referenced column '{}' in FOREIGN KEY constraint",
1187                        unknown
1188                    )
1189                },
1190            )?;
1191        } else {
1192            let known_columns =
1193                self.collect_known_columns(&referenced_display, &referenced_canonical)?;
1194            if !known_columns.is_empty() {
1195                ensure_known_columns_case_insensitive(
1196                    referenced_columns_vec.iter().map(|name| name.as_str()),
1197                    &known_columns,
1198                    |unknown| {
1199                        format!(
1200                            "unknown referenced column '{}' in FOREIGN KEY constraint",
1201                            unknown
1202                        )
1203                    },
1204                )?;
1205            }
1206        }
1207
1208        let on_delete_action = Self::map_referential_action(on_delete, "DELETE")?;
1209        let on_update_action = Self::map_referential_action(on_update, "UPDATE")?;
1210
1211        Ok(ForeignKeySpec {
1212            name,
1213            columns: referencing_columns,
1214            referenced_table: referenced_display,
1215            referenced_columns: referenced_columns_vec,
1216            on_delete: on_delete_action,
1217            on_update: on_update_action,
1218        })
1219    }
1220
1221    fn handle_create_schema(
1222        &self,
1223        schema_name: SchemaName,
1224        _if_not_exists: bool,
1225        with: Option<Vec<SqlOption>>,
1226        options: Option<Vec<SqlOption>>,
1227        default_collate_spec: Option<SqlExpr>,
1228        clone: Option<ObjectName>,
1229    ) -> SqlResult<RuntimeStatementResult<P>> {
1230        if clone.is_some() {
1231            return Err(Error::InvalidArgumentError(
1232                "CREATE SCHEMA ... CLONE is not supported".into(),
1233            ));
1234        }
1235        if with.as_ref().is_some_and(|opts| !opts.is_empty()) {
1236            return Err(Error::InvalidArgumentError(
1237                "CREATE SCHEMA ... WITH options are not supported".into(),
1238            ));
1239        }
1240        if options.as_ref().is_some_and(|opts| !opts.is_empty()) {
1241            return Err(Error::InvalidArgumentError(
1242                "CREATE SCHEMA options are not supported".into(),
1243            ));
1244        }
1245        if default_collate_spec.is_some() {
1246            return Err(Error::InvalidArgumentError(
1247                "CREATE SCHEMA DEFAULT COLLATE is not supported".into(),
1248            ));
1249        }
1250
1251        let schema_name = match schema_name {
1252            SchemaName::Simple(name) => name,
1253            _ => {
1254                return Err(Error::InvalidArgumentError(
1255                    "CREATE SCHEMA authorization is not supported".into(),
1256                ));
1257            }
1258        };
1259
1260        let (display_name, canonical) = canonical_object_name(&schema_name)?;
1261        if display_name.is_empty() {
1262            return Err(Error::InvalidArgumentError(
1263                "schema name must not be empty".into(),
1264            ));
1265        }
1266
1267        // Register schema in the catalog
1268        let catalog = self.engine.context().table_catalog();
1269
1270        if _if_not_exists && catalog.schema_exists(&canonical) {
1271            return Ok(RuntimeStatementResult::NoOp);
1272        }
1273
1274        catalog.register_schema(&canonical).map_err(|err| {
1275            Error::CatalogError(format!(
1276                "Failed to create schema '{}': {}",
1277                display_name, err
1278            ))
1279        })?;
1280
1281        Ok(RuntimeStatementResult::NoOp)
1282    }
1283
1284    fn try_handle_range_ctas(
1285        &self,
1286        display_name: &str,
1287        _canonical_name: &str,
1288        query: &Query,
1289        if_not_exists: bool,
1290        or_replace: bool,
1291        namespace: Option<String>,
1292    ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1293        let select = match query.body.as_ref() {
1294            SetExpr::Select(select) => select,
1295            _ => return Ok(None),
1296        };
1297        if select.from.len() != 1 {
1298            return Ok(None);
1299        }
1300        let table_with_joins = &select.from[0];
1301        if !table_with_joins.joins.is_empty() {
1302            return Ok(None);
1303        }
1304        let (range_size, range_alias) = match &table_with_joins.relation {
1305            TableFactor::Table {
1306                name,
1307                args: Some(args),
1308                alias,
1309                ..
1310            } => {
1311                let func_name = name.to_string().to_ascii_lowercase();
1312                if func_name != "range" {
1313                    return Ok(None);
1314                }
1315                if args.args.len() != 1 {
1316                    return Err(Error::InvalidArgumentError(
1317                        "range table function expects a single argument".into(),
1318                    ));
1319                }
1320                let size_expr = &args.args[0];
1321                let range_size = match size_expr {
1322                    FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1323                        match &value.value {
1324                            Value::Number(raw, _) => raw.parse::<i64>().map_err(|e| {
1325                                Error::InvalidArgumentError(format!(
1326                                    "invalid range size literal {}: {}",
1327                                    raw, e
1328                                ))
1329                            })?,
1330                            other => {
1331                                return Err(Error::InvalidArgumentError(format!(
1332                                    "unsupported range size value: {:?}",
1333                                    other
1334                                )));
1335                            }
1336                        }
1337                    }
1338                    _ => {
1339                        return Err(Error::InvalidArgumentError(
1340                            "unsupported range argument".into(),
1341                        ));
1342                    }
1343                };
1344                (range_size, alias.as_ref().map(|a| a.name.value.clone()))
1345            }
1346            _ => return Ok(None),
1347        };
1348
1349        if range_size < 0 {
1350            return Err(Error::InvalidArgumentError(
1351                "range size must be non-negative".into(),
1352            ));
1353        }
1354
1355        if select.projection.is_empty() {
1356            return Err(Error::InvalidArgumentError(
1357                "CREATE TABLE AS SELECT requires at least one projected column".into(),
1358            ));
1359        }
1360
1361        let mut column_specs = Vec::with_capacity(select.projection.len());
1362        let mut column_names = Vec::with_capacity(select.projection.len());
1363        let mut row_template = Vec::with_capacity(select.projection.len());
1364        for item in &select.projection {
1365            match item {
1366                SelectItem::ExprWithAlias { expr, alias } => {
1367                    let (value, data_type) = match expr {
1368                        SqlExpr::Value(value_with_span) => match &value_with_span.value {
1369                            Value::Number(raw, _) => {
1370                                let parsed = raw.parse::<i64>().map_err(|e| {
1371                                    Error::InvalidArgumentError(format!(
1372                                        "invalid numeric literal {}: {}",
1373                                        raw, e
1374                                    ))
1375                                })?;
1376                                (
1377                                    PlanValue::Integer(parsed),
1378                                    arrow::datatypes::DataType::Int64,
1379                                )
1380                            }
1381                            Value::SingleQuotedString(s) => (
1382                                PlanValue::String(s.clone()),
1383                                arrow::datatypes::DataType::Utf8,
1384                            ),
1385                            other => {
1386                                return Err(Error::InvalidArgumentError(format!(
1387                                    "unsupported SELECT expression in range CTAS: {:?}",
1388                                    other
1389                                )));
1390                            }
1391                        },
1392                        SqlExpr::Identifier(ident) => {
1393                            let ident_lower = ident.value.to_ascii_lowercase();
1394                            if range_alias
1395                                .as_ref()
1396                                .map(|a| a.eq_ignore_ascii_case(&ident_lower))
1397                                .unwrap_or(false)
1398                                || ident_lower == "range"
1399                            {
1400                                return Err(Error::InvalidArgumentError(
1401                                    "range() table function columns are not supported yet".into(),
1402                                ));
1403                            }
1404                            return Err(Error::InvalidArgumentError(format!(
1405                                "unsupported identifier '{}' in range CTAS projection",
1406                                ident.value
1407                            )));
1408                        }
1409                        other => {
1410                            return Err(Error::InvalidArgumentError(format!(
1411                                "unsupported SELECT expression in range CTAS: {:?}",
1412                                other
1413                            )));
1414                        }
1415                    };
1416                    let column_name = alias.value.clone();
1417                    column_specs.push(ColumnSpec::new(column_name.clone(), data_type, true));
1418                    column_names.push(column_name);
1419                    row_template.push(value);
1420                }
1421                other => {
1422                    return Err(Error::InvalidArgumentError(format!(
1423                        "unsupported projection {:?} in range CTAS",
1424                        other
1425                    )));
1426                }
1427            }
1428        }
1429
1430        let plan = CreateTablePlan {
1431            name: display_name.to_string(),
1432            if_not_exists,
1433            or_replace,
1434            columns: column_specs,
1435            source: None,
1436            namespace,
1437            foreign_keys: Vec::new(),
1438        };
1439        let create_result = self.execute_plan_statement(PlanStatement::CreateTable(plan))?;
1440
1441        let row_count = range_size
1442            .try_into()
1443            .map_err(|_| Error::InvalidArgumentError("range size exceeds usize".into()))?;
1444        if row_count > 0 {
1445            let rows = vec![row_template; row_count];
1446            let insert_plan = InsertPlan {
1447                table: display_name.to_string(),
1448                columns: column_names,
1449                source: InsertSource::Rows(rows),
1450            };
1451            self.execute_plan_statement(PlanStatement::Insert(insert_plan))?;
1452        }
1453
1454        Ok(Some(create_result))
1455    }
1456
1457    // TODO: Refactor into runtime or executor layer?
1458    // NOTE: PRAGMA handling lives in the SQL layer until a shared runtime hook exists.
1459    /// Try to handle pragma_table_info('table_name') table function
1460    fn try_handle_pragma_table_info(
1461        &self,
1462        query: &Query,
1463    ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1464        let select = match query.body.as_ref() {
1465            SetExpr::Select(select) => select,
1466            _ => return Ok(None),
1467        };
1468
1469        if select.from.len() != 1 {
1470            return Ok(None);
1471        }
1472
1473        let table_with_joins = &select.from[0];
1474        if !table_with_joins.joins.is_empty() {
1475            return Ok(None);
1476        }
1477
1478        // Check if this is pragma_table_info function call
1479        let table_name = match &table_with_joins.relation {
1480            TableFactor::Table {
1481                name,
1482                args: Some(args),
1483                ..
1484            } => {
1485                let func_name = name.to_string().to_ascii_lowercase();
1486                if func_name != "pragma_table_info" {
1487                    return Ok(None);
1488                }
1489
1490                // Extract table name from argument
1491                if args.args.len() != 1 {
1492                    return Err(Error::InvalidArgumentError(
1493                        "pragma_table_info expects exactly one argument".into(),
1494                    ));
1495                }
1496
1497                match &args.args[0] {
1498                    FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1499                        match &value.value {
1500                            Value::SingleQuotedString(s) => s.clone(),
1501                            Value::DoubleQuotedString(s) => s.clone(),
1502                            _ => {
1503                                return Err(Error::InvalidArgumentError(
1504                                    "pragma_table_info argument must be a string".into(),
1505                                ));
1506                            }
1507                        }
1508                    }
1509                    _ => {
1510                        return Err(Error::InvalidArgumentError(
1511                            "pragma_table_info argument must be a string literal".into(),
1512                        ));
1513                    }
1514                }
1515            }
1516            _ => return Ok(None),
1517        };
1518
1519        // Get table column specs from runtime context
1520        let context = self.engine.context();
1521        let columns = context.table_column_specs(&table_name)?;
1522
1523        // Build RecordBatch with table column information
1524        use arrow::array::{BooleanArray, Int32Array, StringArray};
1525        use arrow::datatypes::{DataType, Field, Schema};
1526
1527        let mut cid_values = Vec::new();
1528        let mut name_values = Vec::new();
1529        let mut type_values = Vec::new();
1530        let mut notnull_values = Vec::new();
1531        let mut dflt_value_values: Vec<Option<String>> = Vec::new();
1532        let mut pk_values = Vec::new();
1533
1534        for (idx, col) in columns.iter().enumerate() {
1535            cid_values.push(idx as i32);
1536            name_values.push(col.name.clone());
1537            type_values.push(format!("{:?}", col.data_type)); // Simple type representation
1538            notnull_values.push(!col.nullable);
1539            dflt_value_values.push(None); // We don't track default values yet
1540            pk_values.push(col.primary_key);
1541        }
1542
1543        let schema = Arc::new(Schema::new(vec![
1544            Field::new("cid", DataType::Int32, false),
1545            Field::new("name", DataType::Utf8, false),
1546            Field::new("type", DataType::Utf8, false),
1547            Field::new("notnull", DataType::Boolean, false),
1548            Field::new("dflt_value", DataType::Utf8, true),
1549            Field::new("pk", DataType::Boolean, false),
1550        ]));
1551
1552        use arrow::array::ArrayRef;
1553        let mut batch = RecordBatch::try_new(
1554            Arc::clone(&schema),
1555            vec![
1556                Arc::new(Int32Array::from(cid_values)) as ArrayRef,
1557                Arc::new(StringArray::from(name_values)) as ArrayRef,
1558                Arc::new(StringArray::from(type_values)) as ArrayRef,
1559                Arc::new(BooleanArray::from(notnull_values)) as ArrayRef,
1560                Arc::new(StringArray::from(dflt_value_values)) as ArrayRef,
1561                Arc::new(BooleanArray::from(pk_values)) as ArrayRef,
1562            ],
1563        )
1564        .map_err(|e| Error::Internal(format!("failed to create pragma_table_info batch: {}", e)))?;
1565
1566        // Apply SELECT projections: extract only requested columns
1567        let projection_indices: Vec<usize> = select
1568            .projection
1569            .iter()
1570            .filter_map(|item| {
1571                match item {
1572                    SelectItem::UnnamedExpr(SqlExpr::Identifier(ident)) => {
1573                        schema.index_of(&ident.value).ok()
1574                    }
1575                    SelectItem::ExprWithAlias { expr, .. } => {
1576                        if let SqlExpr::Identifier(ident) = expr {
1577                            schema.index_of(&ident.value).ok()
1578                        } else {
1579                            None
1580                        }
1581                    }
1582                    SelectItem::Wildcard(_) => None, // Handle * separately
1583                    _ => None,
1584                }
1585            })
1586            .collect();
1587
1588        // Apply projections if not SELECT *
1589        let projected_schema;
1590        if !projection_indices.is_empty() {
1591            let projected_fields: Vec<Field> = projection_indices
1592                .iter()
1593                .map(|&idx| schema.field(idx).clone())
1594                .collect();
1595            projected_schema = Arc::new(Schema::new(projected_fields));
1596
1597            let projected_columns: Vec<ArrayRef> = projection_indices
1598                .iter()
1599                .map(|&idx| Arc::clone(batch.column(idx)))
1600                .collect();
1601
1602            batch = RecordBatch::try_new(Arc::clone(&projected_schema), projected_columns)
1603                .map_err(|e| Error::Internal(format!("failed to project columns: {}", e)))?;
1604        } else {
1605            // SELECT * or complex projections - use original schema
1606            projected_schema = schema;
1607        }
1608
1609        // Apply ORDER BY using Arrow compute kernels
1610        if let Some(order_by) = &query.order_by {
1611            use arrow::compute::SortColumn;
1612            use arrow::compute::lexsort_to_indices;
1613            use sqlparser::ast::OrderByKind;
1614
1615            let exprs = match &order_by.kind {
1616                OrderByKind::Expressions(exprs) => exprs,
1617                _ => {
1618                    return Err(Error::InvalidArgumentError(
1619                        "unsupported ORDER BY clause".into(),
1620                    ));
1621                }
1622            };
1623
1624            let mut sort_columns = Vec::new();
1625            for order_expr in exprs {
1626                if let SqlExpr::Identifier(ident) = &order_expr.expr
1627                    && let Ok(col_idx) = projected_schema.index_of(&ident.value)
1628                {
1629                    let options = arrow::compute::SortOptions {
1630                        descending: !order_expr.options.asc.unwrap_or(true),
1631                        nulls_first: order_expr.options.nulls_first.unwrap_or(false),
1632                    };
1633                    sort_columns.push(SortColumn {
1634                        values: Arc::clone(batch.column(col_idx)),
1635                        options: Some(options),
1636                    });
1637                }
1638            }
1639
1640            if !sort_columns.is_empty() {
1641                let indices = lexsort_to_indices(&sort_columns, None)
1642                    .map_err(|e| Error::Internal(format!("failed to sort: {}", e)))?;
1643
1644                use arrow::compute::take;
1645                let sorted_columns: Result<Vec<ArrayRef>, _> = batch
1646                    .columns()
1647                    .iter()
1648                    .map(|col| take(col.as_ref(), &indices, None))
1649                    .collect();
1650
1651                batch = RecordBatch::try_new(
1652                    Arc::clone(&projected_schema),
1653                    sorted_columns
1654                        .map_err(|e| Error::Internal(format!("failed to apply sort: {}", e)))?,
1655                )
1656                .map_err(|e| Error::Internal(format!("failed to create sorted batch: {}", e)))?;
1657            }
1658        }
1659
1660        let execution = SelectExecution::new_single_batch(
1661            table_name.clone(),
1662            Arc::clone(&projected_schema),
1663            batch,
1664        );
1665
1666        Ok(Some(RuntimeStatementResult::Select {
1667            table_name,
1668            schema: projected_schema,
1669            execution,
1670        }))
1671    }
1672
1673    fn handle_create_table_as(
1674        &self,
1675        display_name: String,
1676        _canonical_name: String,
1677        query: Query,
1678        if_not_exists: bool,
1679        or_replace: bool,
1680        namespace: Option<String>,
1681    ) -> SqlResult<RuntimeStatementResult<P>> {
1682        let select_plan = self.build_select_plan(query)?;
1683
1684        if select_plan.projections.is_empty() && select_plan.aggregates.is_empty() {
1685            return Err(Error::InvalidArgumentError(
1686                "CREATE TABLE AS SELECT requires at least one projected column".into(),
1687            ));
1688        }
1689
1690        let plan = CreateTablePlan {
1691            name: display_name,
1692            if_not_exists,
1693            or_replace,
1694            columns: Vec::new(),
1695            source: Some(CreateTableSource::Select {
1696                plan: Box::new(select_plan),
1697            }),
1698            namespace,
1699            foreign_keys: Vec::new(),
1700        };
1701        self.execute_plan_statement(PlanStatement::CreateTable(plan))
1702    }
1703
1704    fn handle_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<RuntimeStatementResult<P>> {
1705        let table_name_debug =
1706            Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
1707        tracing::trace!(
1708            "DEBUG SQL handle_insert called for table={}",
1709            table_name_debug
1710        );
1711        if !self.engine.session().has_active_transaction()
1712            && self.is_table_marked_dropped(&table_name_debug)?
1713        {
1714            return Err(Error::TransactionContextError(
1715                DROPPED_TABLE_TRANSACTION_ERR.into(),
1716            ));
1717        }
1718        if stmt.replace_into || stmt.ignore || stmt.or.is_some() {
1719            return Err(Error::InvalidArgumentError(
1720                "non-standard INSERT forms are not supported".into(),
1721            ));
1722        }
1723        if stmt.overwrite {
1724            return Err(Error::InvalidArgumentError(
1725                "INSERT OVERWRITE is not supported".into(),
1726            ));
1727        }
1728        if !stmt.assignments.is_empty() {
1729            return Err(Error::InvalidArgumentError(
1730                "INSERT ... SET is not supported".into(),
1731            ));
1732        }
1733        if stmt.partitioned.is_some() || !stmt.after_columns.is_empty() {
1734            return Err(Error::InvalidArgumentError(
1735                "partitioned INSERT is not supported".into(),
1736            ));
1737        }
1738        if stmt.returning.is_some() {
1739            return Err(Error::InvalidArgumentError(
1740                "INSERT ... RETURNING is not supported".into(),
1741            ));
1742        }
1743        if stmt.format_clause.is_some() || stmt.settings.is_some() {
1744            return Err(Error::InvalidArgumentError(
1745                "INSERT with FORMAT or SETTINGS is not supported".into(),
1746            ));
1747        }
1748
1749        let (display_name, _canonical_name) = match &stmt.table {
1750            TableObject::TableName(name) => canonical_object_name(name)?,
1751            _ => {
1752                return Err(Error::InvalidArgumentError(
1753                    "INSERT requires a plain table name".into(),
1754                ));
1755            }
1756        };
1757
1758        let columns: Vec<String> = stmt
1759            .columns
1760            .iter()
1761            .map(|ident| ident.value.clone())
1762            .collect();
1763        let source_expr = stmt
1764            .source
1765            .as_ref()
1766            .ok_or_else(|| Error::InvalidArgumentError("INSERT requires a VALUES clause".into()))?;
1767        validate_simple_query(source_expr)?;
1768
1769        let insert_source = match source_expr.body.as_ref() {
1770            SetExpr::Values(values) => {
1771                if values.rows.is_empty() {
1772                    return Err(Error::InvalidArgumentError(
1773                        "INSERT VALUES list must contain at least one row".into(),
1774                    ));
1775                }
1776                let mut rows: Vec<Vec<SqlValue>> = Vec::with_capacity(values.rows.len());
1777                for row in &values.rows {
1778                    let mut converted = Vec::with_capacity(row.len());
1779                    for expr in row {
1780                        converted.push(SqlValue::try_from_expr(expr)?);
1781                    }
1782                    rows.push(converted);
1783                }
1784                InsertSource::Rows(
1785                    rows.into_iter()
1786                        .map(|row| row.into_iter().map(PlanValue::from).collect())
1787                        .collect(),
1788                )
1789            }
1790            SetExpr::Select(select) => {
1791                if let Some(rows) = extract_constant_select_rows(select.as_ref())? {
1792                    InsertSource::Rows(rows)
1793                } else if let Some(range_rows) = extract_rows_from_range(select.as_ref())? {
1794                    InsertSource::Rows(range_rows.into_rows())
1795                } else {
1796                    let select_plan = self.build_select_plan((**source_expr).clone())?;
1797                    InsertSource::Select {
1798                        plan: Box::new(select_plan),
1799                    }
1800                }
1801            }
1802            _ => {
1803                return Err(Error::InvalidArgumentError(
1804                    "unsupported INSERT source".into(),
1805                ));
1806            }
1807        };
1808
1809        let plan = InsertPlan {
1810            table: display_name.clone(),
1811            columns,
1812            source: insert_source,
1813        };
1814        tracing::trace!(
1815            "DEBUG SQL handle_insert: about to execute insert for table={}",
1816            display_name
1817        );
1818        self.execute_plan_statement(PlanStatement::Insert(plan))
1819    }
1820
1821    fn handle_update(
1822        &self,
1823        table: TableWithJoins,
1824        assignments: Vec<Assignment>,
1825        from: Option<UpdateTableFromKind>,
1826        selection: Option<SqlExpr>,
1827        returning: Option<Vec<SelectItem>>,
1828    ) -> SqlResult<RuntimeStatementResult<P>> {
1829        if from.is_some() {
1830            return Err(Error::InvalidArgumentError(
1831                "UPDATE ... FROM is not supported yet".into(),
1832            ));
1833        }
1834        if returning.is_some() {
1835            return Err(Error::InvalidArgumentError(
1836                "UPDATE ... RETURNING is not supported".into(),
1837            ));
1838        }
1839        if assignments.is_empty() {
1840            return Err(Error::InvalidArgumentError(
1841                "UPDATE requires at least one assignment".into(),
1842            ));
1843        }
1844
1845        let (display_name, canonical_name) = extract_single_table(std::slice::from_ref(&table))?;
1846
1847        if !self.engine.session().has_active_transaction()
1848            && self
1849                .engine
1850                .context()
1851                .is_table_marked_dropped(&canonical_name)
1852        {
1853            return Err(Error::TransactionContextError(
1854                DROPPED_TABLE_TRANSACTION_ERR.into(),
1855            ));
1856        }
1857
1858        let catalog = self.engine.context().table_catalog();
1859        let resolver = catalog.identifier_resolver();
1860        let table_id = catalog.table_id(&canonical_name);
1861
1862        let mut column_assignments = Vec::with_capacity(assignments.len());
1863        let mut seen: HashMap<String, ()> = HashMap::new();
1864        for assignment in assignments {
1865            let column_name = resolve_assignment_column_name(&assignment.target)?;
1866            let normalized = column_name.to_ascii_lowercase();
1867            if seen.insert(normalized, ()).is_some() {
1868                return Err(Error::InvalidArgumentError(format!(
1869                    "duplicate column '{}' in UPDATE assignments",
1870                    column_name
1871                )));
1872            }
1873            let value = match SqlValue::try_from_expr(&assignment.value) {
1874                Ok(literal) => AssignmentValue::Literal(PlanValue::from(literal)),
1875                Err(Error::InvalidArgumentError(msg))
1876                    if msg.contains("unsupported literal expression") =>
1877                {
1878                    let translated = translate_scalar_with_context(
1879                        &resolver,
1880                        IdentifierContext::new(table_id),
1881                        &assignment.value,
1882                    )?;
1883                    AssignmentValue::Expression(translated)
1884                }
1885                Err(err) => return Err(err),
1886            };
1887            column_assignments.push(ColumnAssignment {
1888                column: column_name,
1889                value,
1890            });
1891        }
1892
1893        let filter = match selection {
1894            Some(expr) => Some(translate_condition_with_context(
1895                &resolver,
1896                IdentifierContext::new(table_id),
1897                &expr,
1898            )?),
1899            None => None,
1900        };
1901
1902        let plan = UpdatePlan {
1903            table: display_name.clone(),
1904            assignments: column_assignments,
1905            filter,
1906        };
1907        self.execute_plan_statement(PlanStatement::Update(plan))
1908    }
1909
1910    #[allow(clippy::collapsible_if)]
1911    fn handle_delete(&self, delete: Delete) -> SqlResult<RuntimeStatementResult<P>> {
1912        let Delete {
1913            tables,
1914            from,
1915            using,
1916            selection,
1917            returning,
1918            order_by,
1919            limit,
1920        } = delete;
1921
1922        if !tables.is_empty() {
1923            return Err(Error::InvalidArgumentError(
1924                "multi-table DELETE is not supported yet".into(),
1925            ));
1926        }
1927        if let Some(using_tables) = using {
1928            if !using_tables.is_empty() {
1929                return Err(Error::InvalidArgumentError(
1930                    "DELETE ... USING is not supported yet".into(),
1931                ));
1932            }
1933        }
1934        if returning.is_some() {
1935            return Err(Error::InvalidArgumentError(
1936                "DELETE ... RETURNING is not supported".into(),
1937            ));
1938        }
1939        if !order_by.is_empty() {
1940            return Err(Error::InvalidArgumentError(
1941                "DELETE ... ORDER BY is not supported yet".into(),
1942            ));
1943        }
1944        if limit.is_some() {
1945            return Err(Error::InvalidArgumentError(
1946                "DELETE ... LIMIT is not supported yet".into(),
1947            ));
1948        }
1949
1950        let from_tables = match from {
1951            FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
1952        };
1953        let (display_name, canonical_name) = extract_single_table(&from_tables)?;
1954
1955        if !self.engine.session().has_active_transaction()
1956            && self
1957                .engine
1958                .context()
1959                .is_table_marked_dropped(&canonical_name)
1960        {
1961            return Err(Error::TransactionContextError(
1962                DROPPED_TABLE_TRANSACTION_ERR.into(),
1963            ));
1964        }
1965
1966        let catalog = self.engine.context().table_catalog();
1967        let resolver = catalog.identifier_resolver();
1968        let table_id = catalog.table_id(&canonical_name);
1969
1970        let filter = selection
1971            .map(|expr| {
1972                translate_condition_with_context(&resolver, IdentifierContext::new(table_id), &expr)
1973            })
1974            .transpose()?;
1975
1976        let plan = DeletePlan {
1977            table: display_name.clone(),
1978            filter,
1979        };
1980        self.execute_plan_statement(PlanStatement::Delete(plan))
1981    }
1982
1983    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors SQL grammar; keep grouped until command builder is introduced.
1984    fn handle_drop(
1985        &self,
1986        object_type: ObjectType,
1987        if_exists: bool,
1988        names: Vec<ObjectName>,
1989        cascade: bool,
1990        restrict: bool,
1991        purge: bool,
1992        temporary: bool,
1993    ) -> SqlResult<RuntimeStatementResult<P>> {
1994        if purge || temporary {
1995            return Err(Error::InvalidArgumentError(
1996                "DROP purge/temporary options are not supported".into(),
1997            ));
1998        }
1999
2000        match object_type {
2001            ObjectType::Table => {
2002                if cascade || restrict {
2003                    return Err(Error::InvalidArgumentError(
2004                        "DROP TABLE CASCADE/RESTRICT is not supported".into(),
2005                    ));
2006                }
2007
2008                let session = self.engine.session();
2009                for name in names {
2010                    let table_name = Self::object_name_to_string(&name)?;
2011                    session
2012                        .drop_table(&table_name, if_exists)
2013                        .map_err(|err| Self::map_table_error(&table_name, err))?;
2014                }
2015
2016                Ok(RuntimeStatementResult::NoOp)
2017            }
2018            ObjectType::Schema => {
2019                if restrict {
2020                    return Err(Error::InvalidArgumentError(
2021                        "DROP SCHEMA RESTRICT is not supported".into(),
2022                    ));
2023                }
2024
2025                let catalog = self.engine.context().table_catalog();
2026
2027                for name in names {
2028                    let (display_name, canonical_name) = canonical_object_name(&name)?;
2029
2030                    if !catalog.schema_exists(&canonical_name) {
2031                        if if_exists {
2032                            continue;
2033                        }
2034                        return Err(Error::CatalogError(format!(
2035                            "Schema '{}' does not exist",
2036                            display_name
2037                        )));
2038                    }
2039
2040                    if cascade {
2041                        // Drop all tables in this schema
2042                        let all_tables = catalog.table_names();
2043                        let schema_prefix = format!("{}.", canonical_name);
2044
2045                        let ctx = self.engine.context();
2046                        for table in all_tables {
2047                            if table.to_ascii_lowercase().starts_with(&schema_prefix) {
2048                                ctx.drop_table_immediate(&table, false)?;
2049                            }
2050                        }
2051                    } else {
2052                        // Check if schema has any tables
2053                        let all_tables = catalog.table_names();
2054                        let schema_prefix = format!("{}.", canonical_name);
2055                        let has_tables = all_tables
2056                            .iter()
2057                            .any(|t| t.to_ascii_lowercase().starts_with(&schema_prefix));
2058
2059                        if has_tables {
2060                            return Err(Error::CatalogError(format!(
2061                                "Schema '{}' is not empty. Use CASCADE to drop schema and all its tables",
2062                                display_name
2063                            )));
2064                        }
2065                    }
2066
2067                    // Drop the schema
2068                    if !catalog.unregister_schema(&canonical_name) && !if_exists {
2069                        return Err(Error::CatalogError(format!(
2070                            "Schema '{}' does not exist",
2071                            display_name
2072                        )));
2073                    }
2074                }
2075
2076                Ok(RuntimeStatementResult::NoOp)
2077            }
2078            _ => Err(Error::InvalidArgumentError(format!(
2079                "DROP {} is not supported",
2080                object_type
2081            ))),
2082        }
2083    }
2084
2085    fn handle_query(&self, query: Query) -> SqlResult<RuntimeStatementResult<P>> {
2086        // Check for pragma_table_info() table function first
2087        if let Some(result) = self.try_handle_pragma_table_info(&query)? {
2088            return Ok(result);
2089        }
2090
2091        let select_plan = self.build_select_plan(query)?;
2092        self.execute_plan_statement(PlanStatement::Select(select_plan))
2093    }
2094
2095    fn build_select_plan(&self, query: Query) -> SqlResult<SelectPlan> {
2096        if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
2097            return Err(Error::TransactionContextError(
2098                "TransactionContext Error: transaction is aborted".into(),
2099            ));
2100        }
2101
2102        validate_simple_query(&query)?;
2103        let catalog = self.engine.context().table_catalog();
2104        let resolver = catalog.identifier_resolver();
2105
2106        let (mut select_plan, select_context) = match query.body.as_ref() {
2107            SetExpr::Select(select) => self.translate_select(select.as_ref(), &resolver)?,
2108            other => {
2109                return Err(Error::InvalidArgumentError(format!(
2110                    "unsupported query expression: {other:?}"
2111                )));
2112            }
2113        };
2114        if let Some(order_by) = &query.order_by {
2115            if !select_plan.aggregates.is_empty() {
2116                return Err(Error::InvalidArgumentError(
2117                    "ORDER BY is not supported for aggregate queries".into(),
2118                ));
2119            }
2120            let order_plan = self.translate_order_by(&resolver, select_context, order_by)?;
2121            select_plan = select_plan.with_order_by(order_plan);
2122        }
2123        Ok(select_plan)
2124    }
2125
2126    fn translate_select(
2127        &self,
2128        select: &Select,
2129        resolver: &IdentifierResolver<'_>,
2130    ) -> SqlResult<(SelectPlan, IdentifierContext)> {
2131        if select.distinct.is_some() {
2132            return Err(Error::InvalidArgumentError(
2133                "SELECT DISTINCT is not supported".into(),
2134            ));
2135        }
2136        if select.top.is_some() {
2137            return Err(Error::InvalidArgumentError(
2138                "SELECT TOP is not supported".into(),
2139            ));
2140        }
2141        if select.exclude.is_some() {
2142            return Err(Error::InvalidArgumentError(
2143                "SELECT EXCLUDE is not supported".into(),
2144            ));
2145        }
2146        if select.into.is_some() {
2147            return Err(Error::InvalidArgumentError(
2148                "SELECT INTO is not supported".into(),
2149            ));
2150        }
2151        if !select.lateral_views.is_empty() {
2152            return Err(Error::InvalidArgumentError(
2153                "LATERAL VIEW is not supported".into(),
2154            ));
2155        }
2156        if select.prewhere.is_some() {
2157            return Err(Error::InvalidArgumentError(
2158                "PREWHERE is not supported".into(),
2159            ));
2160        }
2161        if !group_by_is_empty(&select.group_by) || select.value_table_mode.is_some() {
2162            return Err(Error::InvalidArgumentError(
2163                "GROUP BY and SELECT AS VALUE/STRUCT are not supported".into(),
2164            ));
2165        }
2166        if !select.cluster_by.is_empty()
2167            || !select.distribute_by.is_empty()
2168            || !select.sort_by.is_empty()
2169        {
2170            return Err(Error::InvalidArgumentError(
2171                "CLUSTER/DISTRIBUTE/SORT BY clauses are not supported".into(),
2172            ));
2173        }
2174        if select.having.is_some()
2175            || !select.named_window.is_empty()
2176            || select.qualify.is_some()
2177            || select.connect_by.is_some()
2178        {
2179            return Err(Error::InvalidArgumentError(
2180                "advanced SELECT clauses are not supported".into(),
2181            ));
2182        }
2183
2184        let table_alias = select
2185            .from
2186            .first()
2187            .and_then(|table_with_joins| match &table_with_joins.relation {
2188                TableFactor::Table { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2189                _ => None,
2190            });
2191
2192        if let Some(alias) = table_alias.as_ref() {
2193            validate_projection_alias_qualifiers(&select.projection, alias)?;
2194        }
2195        // Handle different FROM clause scenarios
2196        let catalog = self.engine.context().table_catalog();
2197        let (mut plan, id_context) = if select.from.is_empty() {
2198            // No FROM clause - use empty string for table context (e.g., SELECT 42, SELECT {'a': 1} AS x)
2199            let mut p = SelectPlan::new("");
2200            let projections = self.build_projection_list(
2201                resolver,
2202                IdentifierContext::new(None),
2203                &select.projection,
2204            )?;
2205            p = p.with_projections(projections);
2206            (p, IdentifierContext::new(None))
2207        } else if select.from.len() == 1 {
2208            // Single table query
2209            let (display_name, canonical_name) = extract_single_table(&select.from)?;
2210            let table_id = catalog.table_id(&canonical_name);
2211            let mut p = SelectPlan::new(display_name.clone());
2212            if let Some(aggregates) = self.detect_simple_aggregates(&select.projection)? {
2213                p = p.with_aggregates(aggregates);
2214            } else {
2215                let projections = self.build_projection_list(
2216                    resolver,
2217                    IdentifierContext::new(table_id),
2218                    &select.projection,
2219                )?;
2220                p = p.with_projections(projections);
2221            }
2222            (p, IdentifierContext::new(table_id))
2223        } else {
2224            // Multiple tables - cross product
2225            let tables = extract_tables(&select.from)?;
2226            let mut p = SelectPlan::with_tables(tables);
2227            // For multi-table queries, we'll build projections differently
2228            // For now, just handle simple column references
2229            let projections = self.build_projection_list(
2230                resolver,
2231                IdentifierContext::new(None),
2232                &select.projection,
2233            )?;
2234            p = p.with_projections(projections);
2235            (p, IdentifierContext::new(None))
2236        };
2237
2238        let filter_expr = match &select.selection {
2239            Some(expr) => Some(translate_condition_with_context(
2240                resolver, id_context, expr,
2241            )?),
2242            None => None,
2243        };
2244        plan = plan.with_filter(filter_expr);
2245        Ok((plan, id_context))
2246    }
2247
2248    fn translate_order_by(
2249        &self,
2250        resolver: &IdentifierResolver<'_>,
2251        id_context: IdentifierContext,
2252        order_by: &OrderBy,
2253    ) -> SqlResult<Vec<OrderByPlan>> {
2254        let exprs = match &order_by.kind {
2255            OrderByKind::Expressions(exprs) => exprs,
2256            _ => {
2257                return Err(Error::InvalidArgumentError(
2258                    "unsupported ORDER BY clause".into(),
2259                ));
2260            }
2261        };
2262
2263        let base_nulls_first = self.default_nulls_first.load(AtomicOrdering::Relaxed);
2264
2265        let resolve_simple_column = |expr: &SqlExpr| -> SqlResult<String> {
2266            let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2267            match scalar {
2268                llkv_expr::expr::ScalarExpr::Column(column) => Ok(column),
2269                other => Err(Error::InvalidArgumentError(format!(
2270                    "ORDER BY expression must reference a simple column, found {other:?}"
2271                ))),
2272            }
2273        };
2274
2275        let mut plans = Vec::with_capacity(exprs.len());
2276        for order_expr in exprs {
2277            let ascending = order_expr.options.asc.unwrap_or(true);
2278            let default_nulls_first_for_direction = if ascending {
2279                base_nulls_first
2280            } else {
2281                !base_nulls_first
2282            };
2283            let nulls_first = order_expr
2284                .options
2285                .nulls_first
2286                .unwrap_or(default_nulls_first_for_direction);
2287
2288            if let SqlExpr::Identifier(ident) = &order_expr.expr
2289                && ident.value.eq_ignore_ascii_case("ALL")
2290                && ident.quote_style.is_none()
2291            {
2292                plans.push(OrderByPlan {
2293                    target: OrderTarget::All,
2294                    sort_type: OrderSortType::Native,
2295                    ascending,
2296                    nulls_first,
2297                });
2298                continue;
2299            }
2300
2301            let (target, sort_type) = match &order_expr.expr {
2302                SqlExpr::Identifier(_) | SqlExpr::CompoundIdentifier(_) => (
2303                    OrderTarget::Column(resolve_simple_column(&order_expr.expr)?),
2304                    OrderSortType::Native,
2305                ),
2306                SqlExpr::Cast {
2307                    expr,
2308                    data_type:
2309                        SqlDataType::Int(_)
2310                        | SqlDataType::Integer(_)
2311                        | SqlDataType::BigInt(_)
2312                        | SqlDataType::SmallInt(_)
2313                        | SqlDataType::TinyInt(_),
2314                    ..
2315                } => (
2316                    OrderTarget::Column(resolve_simple_column(expr)?),
2317                    OrderSortType::CastTextToInteger,
2318                ),
2319                SqlExpr::Cast { data_type, .. } => {
2320                    return Err(Error::InvalidArgumentError(format!(
2321                        "ORDER BY CAST target type {:?} is not supported",
2322                        data_type
2323                    )));
2324                }
2325                SqlExpr::Value(value_with_span) => match &value_with_span.value {
2326                    Value::Number(raw, _) => {
2327                        let position: usize = raw.parse().map_err(|_| {
2328                            Error::InvalidArgumentError(format!(
2329                                "ORDER BY position '{}' is not a valid positive integer",
2330                                raw
2331                            ))
2332                        })?;
2333                        if position == 0 {
2334                            return Err(Error::InvalidArgumentError(
2335                                "ORDER BY position must be at least 1".into(),
2336                            ));
2337                        }
2338                        (OrderTarget::Index(position - 1), OrderSortType::Native)
2339                    }
2340                    other => {
2341                        return Err(Error::InvalidArgumentError(format!(
2342                            "unsupported ORDER BY literal expression: {other:?}"
2343                        )));
2344                    }
2345                },
2346                other => {
2347                    return Err(Error::InvalidArgumentError(format!(
2348                        "unsupported ORDER BY expression: {other:?}"
2349                    )));
2350                }
2351            };
2352
2353            plans.push(OrderByPlan {
2354                target,
2355                sort_type,
2356                ascending,
2357                nulls_first,
2358            });
2359        }
2360
2361        Ok(plans)
2362    }
2363
2364    fn detect_simple_aggregates(
2365        &self,
2366        projection_items: &[SelectItem],
2367    ) -> SqlResult<Option<Vec<AggregateExpr>>> {
2368        if projection_items.is_empty() {
2369            return Ok(None);
2370        }
2371
2372        let mut specs: Vec<AggregateExpr> = Vec::with_capacity(projection_items.len());
2373        for (idx, item) in projection_items.iter().enumerate() {
2374            let (expr, alias_opt) = match item {
2375                SelectItem::UnnamedExpr(expr) => (expr, None),
2376                SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())),
2377                _ => return Ok(None),
2378            };
2379
2380            let alias = alias_opt.unwrap_or_else(|| format!("col{}", idx + 1));
2381            let SqlExpr::Function(func) = expr else {
2382                return Ok(None);
2383            };
2384
2385            if func.uses_odbc_syntax {
2386                return Err(Error::InvalidArgumentError(
2387                    "ODBC function syntax is not supported in aggregate queries".into(),
2388                ));
2389            }
2390            if !matches!(func.parameters, FunctionArguments::None) {
2391                return Err(Error::InvalidArgumentError(
2392                    "parameterized aggregate functions are not supported".into(),
2393                ));
2394            }
2395            if func.filter.is_some()
2396                || func.null_treatment.is_some()
2397                || func.over.is_some()
2398                || !func.within_group.is_empty()
2399            {
2400                return Err(Error::InvalidArgumentError(
2401                    "advanced aggregate clauses are not supported".into(),
2402                ));
2403            }
2404
2405            let mut is_distinct = false;
2406            let args_slice: &[FunctionArg] = match &func.args {
2407                FunctionArguments::List(list) => {
2408                    if let Some(dup) = &list.duplicate_treatment {
2409                        use sqlparser::ast::DuplicateTreatment;
2410                        match dup {
2411                            DuplicateTreatment::All => {}
2412                            DuplicateTreatment::Distinct => is_distinct = true,
2413                        }
2414                    }
2415                    if !list.clauses.is_empty() {
2416                        return Err(Error::InvalidArgumentError(
2417                            "aggregate argument clauses are not supported".into(),
2418                        ));
2419                    }
2420                    &list.args
2421                }
2422                FunctionArguments::None => &[],
2423                FunctionArguments::Subquery(_) => {
2424                    return Err(Error::InvalidArgumentError(
2425                        "aggregate subquery arguments are not supported".into(),
2426                    ));
2427                }
2428            };
2429
2430            let func_name = if func.name.0.len() == 1 {
2431                match &func.name.0[0] {
2432                    ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
2433                    _ => {
2434                        return Err(Error::InvalidArgumentError(
2435                            "unsupported aggregate function name".into(),
2436                        ));
2437                    }
2438                }
2439            } else {
2440                return Err(Error::InvalidArgumentError(
2441                    "qualified aggregate function names are not supported".into(),
2442                ));
2443            };
2444
2445            let aggregate = match func_name.as_str() {
2446                "count" => {
2447                    if args_slice.len() != 1 {
2448                        return Err(Error::InvalidArgumentError(
2449                            "COUNT accepts exactly one argument".into(),
2450                        ));
2451                    }
2452                    match &args_slice[0] {
2453                        FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
2454                            if is_distinct {
2455                                return Err(Error::InvalidArgumentError(
2456                                    "COUNT(DISTINCT *) is not supported".into(),
2457                                ));
2458                            }
2459                            AggregateExpr::count_star(alias)
2460                        }
2461                        FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
2462                            let column = resolve_column_name(arg_expr)?;
2463                            if is_distinct {
2464                                AggregateExpr::count_distinct_column(column, alias)
2465                            } else {
2466                                AggregateExpr::count_column(column, alias)
2467                            }
2468                        }
2469                        FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
2470                            return Err(Error::InvalidArgumentError(
2471                                "named COUNT arguments are not supported".into(),
2472                            ));
2473                        }
2474                        FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
2475                            return Err(Error::InvalidArgumentError(
2476                                "COUNT does not support qualified wildcards".into(),
2477                            ));
2478                        }
2479                    }
2480                }
2481                "sum" | "min" | "max" => {
2482                    if is_distinct {
2483                        return Err(Error::InvalidArgumentError(
2484                            "DISTINCT is not supported for this aggregate".into(),
2485                        ));
2486                    }
2487                    if args_slice.len() != 1 {
2488                        return Err(Error::InvalidArgumentError(format!(
2489                            "{} accepts exactly one argument",
2490                            func_name.to_uppercase()
2491                        )));
2492                    }
2493                    let arg_expr = match &args_slice[0] {
2494                        FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => arg_expr,
2495                        FunctionArg::Unnamed(FunctionArgExpr::Wildcard)
2496                        | FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
2497                            return Err(Error::InvalidArgumentError(format!(
2498                                "{} does not support wildcard arguments",
2499                                func_name.to_uppercase()
2500                            )));
2501                        }
2502                        FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
2503                            return Err(Error::InvalidArgumentError(format!(
2504                                "{} arguments must be column references",
2505                                func_name.to_uppercase()
2506                            )));
2507                        }
2508                    };
2509
2510                    if func_name == "sum" {
2511                        if let Some(column) = parse_count_nulls_case(arg_expr)? {
2512                            AggregateExpr::count_nulls(column, alias)
2513                        } else {
2514                            let column = resolve_column_name(arg_expr)?;
2515                            AggregateExpr::sum_int64(column, alias)
2516                        }
2517                    } else {
2518                        let column = resolve_column_name(arg_expr)?;
2519                        if func_name == "min" {
2520                            AggregateExpr::min_int64(column, alias)
2521                        } else {
2522                            AggregateExpr::max_int64(column, alias)
2523                        }
2524                    }
2525                }
2526                _ => return Ok(None),
2527            };
2528
2529            specs.push(aggregate);
2530        }
2531
2532        if specs.is_empty() {
2533            return Ok(None);
2534        }
2535        Ok(Some(specs))
2536    }
2537
2538    fn build_projection_list(
2539        &self,
2540        resolver: &IdentifierResolver<'_>,
2541        id_context: IdentifierContext,
2542        projection_items: &[SelectItem],
2543    ) -> SqlResult<Vec<SelectProjection>> {
2544        if projection_items.is_empty() {
2545            return Err(Error::InvalidArgumentError(
2546                "SELECT projection must include at least one column".into(),
2547            ));
2548        }
2549
2550        let mut projections = Vec::with_capacity(projection_items.len());
2551        for (idx, item) in projection_items.iter().enumerate() {
2552            match item {
2553                SelectItem::Wildcard(options) => {
2554                    if let Some(exclude) = &options.opt_exclude {
2555                        use sqlparser::ast::ExcludeSelectItem;
2556                        let exclude_cols = match exclude {
2557                            ExcludeSelectItem::Single(ident) => vec![ident.value.clone()],
2558                            ExcludeSelectItem::Multiple(idents) => {
2559                                idents.iter().map(|id| id.value.clone()).collect()
2560                            }
2561                        };
2562                        projections.push(SelectProjection::AllColumnsExcept {
2563                            exclude: exclude_cols,
2564                        });
2565                    } else {
2566                        projections.push(SelectProjection::AllColumns);
2567                    }
2568                }
2569                SelectItem::QualifiedWildcard(kind, _) => match kind {
2570                    SelectItemQualifiedWildcardKind::ObjectName(name) => {
2571                        projections.push(SelectProjection::Column {
2572                            name: name.to_string(),
2573                            alias: None,
2574                        });
2575                    }
2576                    SelectItemQualifiedWildcardKind::Expr(_) => {
2577                        return Err(Error::InvalidArgumentError(
2578                            "expression-qualified wildcards are not supported".into(),
2579                        ));
2580                    }
2581                },
2582                SelectItem::UnnamedExpr(expr) => match expr {
2583                    SqlExpr::Identifier(ident) => {
2584                        let parts = vec![ident.value.clone()];
2585                        let resolution = resolver.resolve(&parts, id_context)?;
2586                        if resolution.is_simple() {
2587                            projections.push(SelectProjection::Column {
2588                                name: resolution.column().to_string(),
2589                                alias: None,
2590                            });
2591                        } else {
2592                            let alias = format!("col{}", idx + 1);
2593                            projections.push(SelectProjection::Computed {
2594                                expr: resolution.into_scalar_expr(),
2595                                alias,
2596                            });
2597                        }
2598                    }
2599                    SqlExpr::CompoundIdentifier(parts) => {
2600                        let name_parts: Vec<String> =
2601                            parts.iter().map(|part| part.value.clone()).collect();
2602                        let resolution = resolver.resolve(&name_parts, id_context)?;
2603                        if resolution.is_simple() {
2604                            projections.push(SelectProjection::Column {
2605                                name: resolution.column().to_string(),
2606                                alias: None,
2607                            });
2608                        } else {
2609                            let alias = format!("col{}", idx + 1);
2610                            projections.push(SelectProjection::Computed {
2611                                expr: resolution.into_scalar_expr(),
2612                                alias,
2613                            });
2614                        }
2615                    }
2616                    _ => {
2617                        let alias = format!("col{}", idx + 1);
2618                        let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2619                        projections.push(SelectProjection::Computed {
2620                            expr: scalar,
2621                            alias,
2622                        });
2623                    }
2624                },
2625                SelectItem::ExprWithAlias { expr, alias } => match expr {
2626                    SqlExpr::Identifier(ident) => {
2627                        let parts = vec![ident.value.clone()];
2628                        let resolution = resolver.resolve(&parts, id_context)?;
2629                        if resolution.is_simple() {
2630                            projections.push(SelectProjection::Column {
2631                                name: resolution.column().to_string(),
2632                                alias: Some(alias.value.clone()),
2633                            });
2634                        } else {
2635                            projections.push(SelectProjection::Computed {
2636                                expr: resolution.into_scalar_expr(),
2637                                alias: alias.value.clone(),
2638                            });
2639                        }
2640                    }
2641                    SqlExpr::CompoundIdentifier(parts) => {
2642                        let name_parts: Vec<String> =
2643                            parts.iter().map(|part| part.value.clone()).collect();
2644                        let resolution = resolver.resolve(&name_parts, id_context)?;
2645                        if resolution.is_simple() {
2646                            projections.push(SelectProjection::Column {
2647                                name: resolution.column().to_string(),
2648                                alias: Some(alias.value.clone()),
2649                            });
2650                        } else {
2651                            projections.push(SelectProjection::Computed {
2652                                expr: resolution.into_scalar_expr(),
2653                                alias: alias.value.clone(),
2654                            });
2655                        }
2656                    }
2657                    _ => {
2658                        let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2659                        projections.push(SelectProjection::Computed {
2660                            expr: scalar,
2661                            alias: alias.value.clone(),
2662                        });
2663                    }
2664                },
2665            }
2666        }
2667        Ok(projections)
2668    }
2669
2670    #[allow(clippy::too_many_arguments)] // NOTE: Keeps parity with SQL START TRANSACTION grammar; revisit when options expand.
2671    fn handle_start_transaction(
2672        &self,
2673        modes: Vec<TransactionMode>,
2674        begin: bool,
2675        transaction: Option<BeginTransactionKind>,
2676        modifier: Option<TransactionModifier>,
2677        statements: Vec<Statement>,
2678        exception: Option<Vec<ExceptionWhen>>,
2679        has_end_keyword: bool,
2680    ) -> SqlResult<RuntimeStatementResult<P>> {
2681        if !modes.is_empty() {
2682            return Err(Error::InvalidArgumentError(
2683                "transaction modes are not supported".into(),
2684            ));
2685        }
2686        if modifier.is_some() {
2687            return Err(Error::InvalidArgumentError(
2688                "transaction modifiers are not supported".into(),
2689            ));
2690        }
2691        if !statements.is_empty() || exception.is_some() || has_end_keyword {
2692            return Err(Error::InvalidArgumentError(
2693                "BEGIN blocks with inline statements or exceptions are not supported".into(),
2694            ));
2695        }
2696        if let Some(kind) = transaction {
2697            match kind {
2698                BeginTransactionKind::Transaction | BeginTransactionKind::Work => {}
2699            }
2700        }
2701        if !begin {
2702            // Currently treat START TRANSACTION same as BEGIN
2703            tracing::warn!("Currently treat `START TRANSACTION` same as `BEGIN`")
2704        }
2705
2706        self.execute_plan_statement(PlanStatement::BeginTransaction)
2707    }
2708
2709    fn handle_commit(
2710        &self,
2711        chain: bool,
2712        end: bool,
2713        modifier: Option<TransactionModifier>,
2714    ) -> SqlResult<RuntimeStatementResult<P>> {
2715        if chain {
2716            return Err(Error::InvalidArgumentError(
2717                "COMMIT AND [NO] CHAIN is not supported".into(),
2718            ));
2719        }
2720        if end {
2721            return Err(Error::InvalidArgumentError(
2722                "END blocks are not supported".into(),
2723            ));
2724        }
2725        if modifier.is_some() {
2726            return Err(Error::InvalidArgumentError(
2727                "transaction modifiers are not supported".into(),
2728            ));
2729        }
2730
2731        self.execute_plan_statement(PlanStatement::CommitTransaction)
2732    }
2733
2734    fn handle_rollback(
2735        &self,
2736        chain: bool,
2737        savepoint: Option<Ident>,
2738    ) -> SqlResult<RuntimeStatementResult<P>> {
2739        if chain {
2740            return Err(Error::InvalidArgumentError(
2741                "ROLLBACK AND [NO] CHAIN is not supported".into(),
2742            ));
2743        }
2744        if savepoint.is_some() {
2745            return Err(Error::InvalidArgumentError(
2746                "ROLLBACK TO SAVEPOINT is not supported".into(),
2747            ));
2748        }
2749
2750        self.execute_plan_statement(PlanStatement::RollbackTransaction)
2751    }
2752
2753    fn handle_set(&self, set_stmt: Set) -> SqlResult<RuntimeStatementResult<P>> {
2754        match set_stmt {
2755            Set::SingleAssignment {
2756                scope,
2757                hivevar,
2758                variable,
2759                values,
2760            } => {
2761                if scope.is_some() || hivevar {
2762                    return Err(Error::InvalidArgumentError(
2763                        "SET modifiers are not supported".into(),
2764                    ));
2765                }
2766
2767                let variable_name_raw = variable.to_string();
2768                let variable_name = variable_name_raw.to_ascii_lowercase();
2769
2770                match variable_name.as_str() {
2771                    "default_null_order" => {
2772                        if values.len() != 1 {
2773                            return Err(Error::InvalidArgumentError(
2774                                "SET default_null_order expects exactly one value".into(),
2775                            ));
2776                        }
2777
2778                        let value_expr = &values[0];
2779                        let normalized = match value_expr {
2780                            SqlExpr::Value(value_with_span) => value_with_span
2781                                .value
2782                                .clone()
2783                                .into_string()
2784                                .map(|s| s.to_ascii_lowercase()),
2785                            SqlExpr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
2786                            _ => None,
2787                        };
2788
2789                        if !matches!(normalized.as_deref(), Some("nulls_first" | "nulls_last")) {
2790                            return Err(Error::InvalidArgumentError(format!(
2791                                "unsupported value for SET default_null_order: {value_expr:?}"
2792                            )));
2793                        }
2794
2795                        let use_nulls_first = matches!(normalized.as_deref(), Some("nulls_first"));
2796                        self.default_nulls_first
2797                            .store(use_nulls_first, AtomicOrdering::Relaxed);
2798
2799                        Ok(RuntimeStatementResult::NoOp)
2800                    }
2801                    "immediate_transaction_mode" => {
2802                        if values.len() != 1 {
2803                            return Err(Error::InvalidArgumentError(
2804                                "SET immediate_transaction_mode expects exactly one value".into(),
2805                            ));
2806                        }
2807                        let normalized = values[0].to_string().to_ascii_lowercase();
2808                        let enabled = match normalized.as_str() {
2809                            "true" | "on" | "1" => true,
2810                            "false" | "off" | "0" => false,
2811                            _ => {
2812                                return Err(Error::InvalidArgumentError(format!(
2813                                    "unsupported value for SET immediate_transaction_mode: {}",
2814                                    values[0]
2815                                )));
2816                            }
2817                        };
2818                        if !enabled {
2819                            tracing::warn!(
2820                                "SET immediate_transaction_mode=false has no effect; continuing with auto mode"
2821                            );
2822                        }
2823                        Ok(RuntimeStatementResult::NoOp)
2824                    }
2825                    _ => Err(Error::InvalidArgumentError(format!(
2826                        "unsupported SET variable: {variable_name_raw}"
2827                    ))),
2828                }
2829            }
2830            other => Err(Error::InvalidArgumentError(format!(
2831                "unsupported SQL SET statement: {other:?}",
2832            ))),
2833        }
2834    }
2835
2836    fn handle_pragma(
2837        &self,
2838        name: ObjectName,
2839        value: Option<Value>,
2840        is_eq: bool,
2841    ) -> SqlResult<RuntimeStatementResult<P>> {
2842        let (display, canonical) = canonical_object_name(&name)?;
2843        if value.is_some() || is_eq {
2844            return Err(Error::InvalidArgumentError(format!(
2845                "PRAGMA '{display}' does not accept a value"
2846            )));
2847        }
2848
2849        match canonical.as_str() {
2850            "enable_verification" | "disable_verification" => Ok(RuntimeStatementResult::NoOp),
2851            _ => Err(Error::InvalidArgumentError(format!(
2852                "unsupported PRAGMA '{}'",
2853                display
2854            ))),
2855        }
2856    }
2857}
2858
2859fn canonical_object_name(name: &ObjectName) -> SqlResult<(String, String)> {
2860    if name.0.is_empty() {
2861        return Err(Error::InvalidArgumentError(
2862            "object name must not be empty".into(),
2863        ));
2864    }
2865    let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
2866    for part in &name.0 {
2867        let ident = match part {
2868            ObjectNamePart::Identifier(ident) => ident,
2869            _ => {
2870                return Err(Error::InvalidArgumentError(
2871                    "object names using functions are not supported".into(),
2872                ));
2873            }
2874        };
2875        parts.push(ident.value.clone());
2876    }
2877    let display = parts.join(".");
2878    let canonical = display.to_ascii_lowercase();
2879    Ok((display, canonical))
2880}
2881
2882/// Parse an object name into optional schema and table name components.
2883///
2884/// Returns (schema_name, table_name) where schema_name is None if not qualified.
2885///
2886/// Examples:
2887/// - "users" -> (None, "users")
2888/// - "test.users" -> (Some("test"), "users")
2889/// - "catalog.test.users" -> Error (too many parts)
2890fn parse_schema_qualified_name(name: &ObjectName) -> SqlResult<(Option<String>, String)> {
2891    if name.0.is_empty() {
2892        return Err(Error::InvalidArgumentError(
2893            "object name must not be empty".into(),
2894        ));
2895    }
2896
2897    let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
2898    for part in &name.0 {
2899        let ident = match part {
2900            ObjectNamePart::Identifier(ident) => ident,
2901            _ => {
2902                return Err(Error::InvalidArgumentError(
2903                    "object names using functions are not supported".into(),
2904                ));
2905            }
2906        };
2907        parts.push(ident.value.clone());
2908    }
2909
2910    match parts.len() {
2911        1 => Ok((None, parts[0].clone())),
2912        2 => Ok((Some(parts[0].clone()), parts[1].clone())),
2913        _ => Err(Error::InvalidArgumentError(format!(
2914            "table name has too many parts: {}",
2915            name
2916        ))),
2917    }
2918}
2919
2920/// Extract column name from an index column specification (OrderBy expression).
2921///
2922/// This handles the common pattern of validating and extracting column names from
2923/// SQL index column definitions (PRIMARY KEY, UNIQUE, CREATE INDEX).
2924///
2925/// # Parameters
2926/// - `index_col`: The index column from sqlparser AST
2927/// - `context`: Description of where this column appears (e.g., "PRIMARY KEY", "UNIQUE constraint")
2928/// - `allow_sort_options`: If false, errors if any sort options are present; if true, validates them
2929/// - `allow_compound`: If true, allows compound identifiers and takes the last part; if false, only allows simple identifiers
2930///
2931/// # Returns
2932/// Column name as a String
2933fn extract_index_column_name(
2934    index_col: &sqlparser::ast::IndexColumn,
2935    context: &str,
2936    allow_sort_options: bool,
2937    allow_compound: bool,
2938) -> SqlResult<String> {
2939    use sqlparser::ast::Expr as SqlExpr;
2940
2941    // Check operator class
2942    if index_col.operator_class.is_some() {
2943        return Err(Error::InvalidArgumentError(format!(
2944            "{} operator classes are not supported",
2945            context
2946        )));
2947    }
2948
2949    let order_expr = &index_col.column;
2950
2951    // Validate sort options
2952    if allow_sort_options {
2953        // For CREATE INDEX: validate specific values are supported
2954        let ascending = order_expr.options.asc.unwrap_or(true);
2955        let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
2956
2957        if !ascending {
2958            return Err(Error::InvalidArgumentError(format!(
2959                "{} DESC ordering is not supported",
2960                context
2961            )));
2962        }
2963        if nulls_first {
2964            return Err(Error::InvalidArgumentError(format!(
2965                "{} NULLS FIRST ordering is not supported",
2966                context
2967            )));
2968        }
2969    } else {
2970        // For constraints: no sort options allowed
2971        if order_expr.options.asc.is_some()
2972            || order_expr.options.nulls_first.is_some()
2973            || order_expr.with_fill.is_some()
2974        {
2975            return Err(Error::InvalidArgumentError(format!(
2976                "{} columns must be simple identifiers",
2977                context
2978            )));
2979        }
2980    }
2981
2982    // Extract column name from expression
2983    let column_name = match &order_expr.expr {
2984        SqlExpr::Identifier(ident) => ident.value.clone(),
2985        SqlExpr::CompoundIdentifier(parts) => {
2986            if allow_compound {
2987                // For CREATE INDEX: allow qualified names, take last part
2988                parts
2989                    .last()
2990                    .map(|ident| ident.value.clone())
2991                    .ok_or_else(|| {
2992                        Error::InvalidArgumentError(format!(
2993                            "invalid column reference in {}",
2994                            context
2995                        ))
2996                    })?
2997            } else if parts.len() == 1 {
2998                // For constraints: only allow single-part compound identifiers
2999                parts[0].value.clone()
3000            } else {
3001                return Err(Error::InvalidArgumentError(format!(
3002                    "{} columns must be column identifiers",
3003                    context
3004                )));
3005            }
3006        }
3007        other => {
3008            return Err(Error::InvalidArgumentError(format!(
3009                "{} only supports column references, found {:?}",
3010                context, other
3011            )));
3012        }
3013    };
3014
3015    Ok(column_name)
3016}
3017
3018fn validate_create_table_common(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3019    if stmt.clone.is_some() || stmt.like.is_some() {
3020        return Err(Error::InvalidArgumentError(
3021            "CREATE TABLE LIKE/CLONE is not supported".into(),
3022        ));
3023    }
3024    if stmt.or_replace && stmt.if_not_exists {
3025        return Err(Error::InvalidArgumentError(
3026            "CREATE TABLE cannot combine OR REPLACE with IF NOT EXISTS".into(),
3027        ));
3028    }
3029    use sqlparser::ast::TableConstraint;
3030
3031    let mut seen_primary_key = false;
3032    for constraint in &stmt.constraints {
3033        match constraint {
3034            TableConstraint::PrimaryKey { .. } => {
3035                if seen_primary_key {
3036                    return Err(Error::InvalidArgumentError(
3037                        "multiple PRIMARY KEY constraints are not supported".into(),
3038                    ));
3039                }
3040                seen_primary_key = true;
3041            }
3042            TableConstraint::Unique { .. } => {
3043                // Detailed validation is performed later during plan construction.
3044            }
3045            TableConstraint::ForeignKey { .. } => {
3046                // Detailed validation is performed later during plan construction.
3047            }
3048            other => {
3049                return Err(Error::InvalidArgumentError(format!(
3050                    "table-level constraint {:?} is not supported",
3051                    other
3052                )));
3053            }
3054        }
3055    }
3056    Ok(())
3057}
3058
3059fn validate_check_constraint(
3060    check_expr: &sqlparser::ast::Expr,
3061    table_name: &str,
3062    column_names: &[&str],
3063) -> SqlResult<()> {
3064    use sqlparser::ast::Expr as SqlExpr;
3065
3066    let column_names_lower: HashSet<String> = column_names
3067        .iter()
3068        .map(|name| name.to_ascii_lowercase())
3069        .collect();
3070
3071    let mut stack: Vec<&SqlExpr> = vec![check_expr];
3072
3073    while let Some(expr) = stack.pop() {
3074        match expr {
3075            SqlExpr::Subquery(_) => {
3076                return Err(Error::InvalidArgumentError(
3077                    "Subqueries are not allowed in CHECK constraints".into(),
3078                ));
3079            }
3080            SqlExpr::Function(func) => {
3081                let func_name = func.name.to_string().to_uppercase();
3082                if matches!(func_name.as_str(), "SUM" | "AVG" | "COUNT" | "MIN" | "MAX") {
3083                    return Err(Error::InvalidArgumentError(
3084                        "Aggregate functions are not allowed in CHECK constraints".into(),
3085                    ));
3086                }
3087
3088                if let sqlparser::ast::FunctionArguments::List(list) = &func.args {
3089                    for arg in &list.args {
3090                        if let sqlparser::ast::FunctionArg::Unnamed(
3091                            sqlparser::ast::FunctionArgExpr::Expr(expr),
3092                        ) = arg
3093                        {
3094                            stack.push(expr);
3095                        }
3096                    }
3097                }
3098            }
3099            SqlExpr::Identifier(ident) => {
3100                if !column_names_lower.contains(&ident.value.to_ascii_lowercase()) {
3101                    return Err(Error::InvalidArgumentError(format!(
3102                        "Column '{}' referenced in CHECK constraint does not exist",
3103                        ident.value
3104                    )));
3105                }
3106            }
3107            SqlExpr::CompoundIdentifier(idents) => {
3108                if idents.len() == 2 {
3109                    let first = idents[0].value.as_str();
3110                    let second = &idents[1].value;
3111
3112                    if column_names_lower.contains(&first.to_ascii_lowercase()) {
3113                        continue;
3114                    }
3115
3116                    if !first.eq_ignore_ascii_case(table_name) {
3117                        return Err(Error::InvalidArgumentError(format!(
3118                            "CHECK constraint references column from different table '{}'",
3119                            first
3120                        )));
3121                    }
3122
3123                    if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3124                        return Err(Error::InvalidArgumentError(format!(
3125                            "Column '{}' referenced in CHECK constraint does not exist",
3126                            second
3127                        )));
3128                    }
3129                } else if idents.len() == 3 {
3130                    let first = &idents[0].value;
3131                    let second = &idents[1].value;
3132                    let third = &idents[2].value;
3133
3134                    if first.eq_ignore_ascii_case(table_name) {
3135                        if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3136                            return Err(Error::InvalidArgumentError(format!(
3137                                "Column '{}' referenced in CHECK constraint does not exist",
3138                                second
3139                            )));
3140                        }
3141                    } else if second.eq_ignore_ascii_case(table_name) {
3142                        if !column_names_lower.contains(&third.to_ascii_lowercase()) {
3143                            return Err(Error::InvalidArgumentError(format!(
3144                                "Column '{}' referenced in CHECK constraint does not exist",
3145                                third
3146                            )));
3147                        }
3148                    } else {
3149                        return Err(Error::InvalidArgumentError(format!(
3150                            "CHECK constraint references column from different table '{}'",
3151                            second
3152                        )));
3153                    }
3154                }
3155            }
3156            SqlExpr::BinaryOp { left, right, .. } => {
3157                stack.push(left);
3158                stack.push(right);
3159            }
3160            SqlExpr::UnaryOp { expr, .. } | SqlExpr::Nested(expr) => {
3161                stack.push(expr);
3162            }
3163            SqlExpr::Value(_) | SqlExpr::TypedString { .. } => {}
3164            _ => {}
3165        }
3166    }
3167
3168    Ok(())
3169}
3170
3171fn validate_create_table_definition(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3172    for column in &stmt.columns {
3173        for ColumnOptionDef { option, .. } in &column.options {
3174            match option {
3175                ColumnOption::Null
3176                | ColumnOption::NotNull
3177                | ColumnOption::Unique { .. }
3178                | ColumnOption::Check(_)
3179                | ColumnOption::ForeignKey { .. } => {}
3180                ColumnOption::Default(_) => {
3181                    return Err(Error::InvalidArgumentError(format!(
3182                        "DEFAULT values are not supported for column '{}'",
3183                        column.name
3184                    )));
3185                }
3186                other => {
3187                    return Err(Error::InvalidArgumentError(format!(
3188                        "unsupported column option {:?} on '{}'",
3189                        other, column.name
3190                    )));
3191                }
3192            }
3193        }
3194    }
3195    Ok(())
3196}
3197
3198fn validate_create_table_as(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3199    if !stmt.columns.is_empty() {
3200        return Err(Error::InvalidArgumentError(
3201            "CREATE TABLE AS SELECT does not support column definitions yet".into(),
3202        ));
3203    }
3204    Ok(())
3205}
3206
3207fn validate_simple_query(query: &Query) -> SqlResult<()> {
3208    if query.with.is_some() {
3209        return Err(Error::InvalidArgumentError(
3210            "WITH clauses are not supported".into(),
3211        ));
3212    }
3213    if let Some(limit_clause) = &query.limit_clause {
3214        match limit_clause {
3215            LimitClause::LimitOffset {
3216                offset: Some(_), ..
3217            }
3218            | LimitClause::OffsetCommaLimit { .. } => {
3219                return Err(Error::InvalidArgumentError(
3220                    "OFFSET clauses are not supported".into(),
3221                ));
3222            }
3223            LimitClause::LimitOffset { limit_by, .. } if !limit_by.is_empty() => {
3224                return Err(Error::InvalidArgumentError(
3225                    "LIMIT BY clauses are not supported".into(),
3226                ));
3227            }
3228            _ => {}
3229        }
3230    }
3231    if query.fetch.is_some() {
3232        return Err(Error::InvalidArgumentError(
3233            "FETCH clauses are not supported".into(),
3234        ));
3235    }
3236    Ok(())
3237}
3238
3239fn resolve_column_name(expr: &SqlExpr) -> SqlResult<String> {
3240    match expr {
3241        SqlExpr::Identifier(ident) => Ok(ident.value.clone()),
3242        SqlExpr::CompoundIdentifier(parts) => {
3243            if let Some(last) = parts.last() {
3244                Ok(last.value.clone())
3245            } else {
3246                Err(Error::InvalidArgumentError(
3247                    "empty column identifier".into(),
3248                ))
3249            }
3250        }
3251        _ => Err(Error::InvalidArgumentError(
3252            "aggregate arguments must be plain column identifiers".into(),
3253        )),
3254    }
3255}
3256
3257fn validate_projection_alias_qualifiers(
3258    projection_items: &[SelectItem],
3259    alias: &str,
3260) -> SqlResult<()> {
3261    let alias_lower = alias.to_ascii_lowercase();
3262    for item in projection_items {
3263        match item {
3264            SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
3265                if let SqlExpr::CompoundIdentifier(parts) = expr
3266                    && parts.len() >= 2
3267                    && let Some(first) = parts.first()
3268                    && !first.value.eq_ignore_ascii_case(&alias_lower)
3269                {
3270                    return Err(Error::InvalidArgumentError(format!(
3271                        "Binder Error: table '{}' not found",
3272                        first.value
3273                    )));
3274                }
3275            }
3276            _ => {}
3277        }
3278    }
3279    Ok(())
3280}
3281
3282/// Try to parse a function as an aggregate call for use in scalar expressions
3283/// Check if a scalar expression contains any aggregate functions
3284#[allow(dead_code)] // Utility function for future use
3285fn expr_contains_aggregate(expr: &llkv_expr::expr::ScalarExpr<String>) -> bool {
3286    match expr {
3287        llkv_expr::expr::ScalarExpr::Aggregate(_) => true,
3288        llkv_expr::expr::ScalarExpr::Binary { left, right, .. } => {
3289            expr_contains_aggregate(left) || expr_contains_aggregate(right)
3290        }
3291        llkv_expr::expr::ScalarExpr::GetField { base, .. } => expr_contains_aggregate(base),
3292        llkv_expr::expr::ScalarExpr::Column(_) | llkv_expr::expr::ScalarExpr::Literal(_) => false,
3293    }
3294}
3295
3296fn try_parse_aggregate_function(
3297    func: &sqlparser::ast::Function,
3298) -> SqlResult<Option<llkv_expr::expr::AggregateCall<String>>> {
3299    use sqlparser::ast::{FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart};
3300
3301    if func.uses_odbc_syntax {
3302        return Ok(None);
3303    }
3304    if !matches!(func.parameters, FunctionArguments::None) {
3305        return Ok(None);
3306    }
3307    if func.filter.is_some()
3308        || func.null_treatment.is_some()
3309        || func.over.is_some()
3310        || !func.within_group.is_empty()
3311    {
3312        return Ok(None);
3313    }
3314
3315    let func_name = if func.name.0.len() == 1 {
3316        match &func.name.0[0] {
3317            ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
3318            _ => return Ok(None),
3319        }
3320    } else {
3321        return Ok(None);
3322    };
3323
3324    let args_slice: &[FunctionArg] = match &func.args {
3325        FunctionArguments::List(list) => {
3326            if list.duplicate_treatment.is_some() || !list.clauses.is_empty() {
3327                return Ok(None);
3328            }
3329            &list.args
3330        }
3331        FunctionArguments::None => &[],
3332        FunctionArguments::Subquery(_) => return Ok(None),
3333    };
3334
3335    let agg_call = match func_name.as_str() {
3336        "count" => {
3337            if args_slice.len() != 1 {
3338                return Err(Error::InvalidArgumentError(
3339                    "COUNT accepts exactly one argument".into(),
3340                ));
3341            }
3342            match &args_slice[0] {
3343                FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
3344                    llkv_expr::expr::AggregateCall::CountStar
3345                }
3346                FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
3347                    let column = resolve_column_name(arg_expr)?;
3348                    llkv_expr::expr::AggregateCall::Count(column)
3349                }
3350                _ => {
3351                    return Err(Error::InvalidArgumentError(
3352                        "unsupported COUNT argument".into(),
3353                    ));
3354                }
3355            }
3356        }
3357        "sum" => {
3358            if args_slice.len() != 1 {
3359                return Err(Error::InvalidArgumentError(
3360                    "SUM accepts exactly one argument".into(),
3361                ));
3362            }
3363            let arg_expr = match &args_slice[0] {
3364                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3365                _ => {
3366                    return Err(Error::InvalidArgumentError(
3367                        "SUM requires a column argument".into(),
3368                    ));
3369                }
3370            };
3371
3372            // Check for COUNT(CASE ...) pattern
3373            if let Some(column) = parse_count_nulls_case(arg_expr)? {
3374                llkv_expr::expr::AggregateCall::CountNulls(column)
3375            } else {
3376                let column = resolve_column_name(arg_expr)?;
3377                llkv_expr::expr::AggregateCall::Sum(column)
3378            }
3379        }
3380        "min" => {
3381            if args_slice.len() != 1 {
3382                return Err(Error::InvalidArgumentError(
3383                    "MIN accepts exactly one argument".into(),
3384                ));
3385            }
3386            let arg_expr = match &args_slice[0] {
3387                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3388                _ => {
3389                    return Err(Error::InvalidArgumentError(
3390                        "MIN requires a column argument".into(),
3391                    ));
3392                }
3393            };
3394            let column = resolve_column_name(arg_expr)?;
3395            llkv_expr::expr::AggregateCall::Min(column)
3396        }
3397        "max" => {
3398            if args_slice.len() != 1 {
3399                return Err(Error::InvalidArgumentError(
3400                    "MAX accepts exactly one argument".into(),
3401                ));
3402            }
3403            let arg_expr = match &args_slice[0] {
3404                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3405                _ => {
3406                    return Err(Error::InvalidArgumentError(
3407                        "MAX requires a column argument".into(),
3408                    ));
3409                }
3410            };
3411            let column = resolve_column_name(arg_expr)?;
3412            llkv_expr::expr::AggregateCall::Max(column)
3413        }
3414        _ => return Ok(None),
3415    };
3416
3417    Ok(Some(agg_call))
3418}
3419
3420fn parse_count_nulls_case(expr: &SqlExpr) -> SqlResult<Option<String>> {
3421    let SqlExpr::Case {
3422        operand,
3423        conditions,
3424        else_result,
3425        ..
3426    } = expr
3427    else {
3428        return Ok(None);
3429    };
3430
3431    if operand.is_some() || conditions.len() != 1 {
3432        return Ok(None);
3433    }
3434
3435    let case_when = &conditions[0];
3436    if !is_integer_literal(&case_when.result, 1) {
3437        return Ok(None);
3438    }
3439
3440    let else_expr = match else_result {
3441        Some(expr) => expr.as_ref(),
3442        None => return Ok(None),
3443    };
3444    if !is_integer_literal(else_expr, 0) {
3445        return Ok(None);
3446    }
3447
3448    let inner = match &case_when.condition {
3449        SqlExpr::IsNull(inner) => inner.as_ref(),
3450        _ => return Ok(None),
3451    };
3452
3453    resolve_column_name(inner).map(Some)
3454}
3455
3456fn is_integer_literal(expr: &SqlExpr, expected: i64) -> bool {
3457    match expr {
3458        SqlExpr::Value(ValueWithSpan {
3459            value: Value::Number(text, _),
3460            ..
3461        }) => text.parse::<i64>() == Ok(expected),
3462        _ => false,
3463    }
3464}
3465
3466fn translate_condition_with_context(
3467    resolver: &IdentifierResolver<'_>,
3468    context: IdentifierContext,
3469    expr: &SqlExpr,
3470) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
3471    match expr {
3472        SqlExpr::IsNull(inner) => {
3473            let scalar = translate_scalar_with_context(resolver, context, inner)?;
3474            match scalar {
3475                llkv_expr::expr::ScalarExpr::Column(column) => {
3476                    Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3477                        field_id: column,
3478                        op: llkv_expr::expr::Operator::IsNull,
3479                    }))
3480                }
3481                _ => Err(Error::InvalidArgumentError(
3482                    "IS NULL predicates currently support column references only".into(),
3483                )),
3484            }
3485        }
3486        SqlExpr::IsNotNull(inner) => {
3487            let scalar = translate_scalar_with_context(resolver, context, inner)?;
3488            match scalar {
3489                llkv_expr::expr::ScalarExpr::Column(column) => {
3490                    Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3491                        field_id: column,
3492                        op: llkv_expr::expr::Operator::IsNotNull,
3493                    }))
3494                }
3495                _ => Err(Error::InvalidArgumentError(
3496                    "IS NOT NULL predicates currently support column references only".into(),
3497                )),
3498            }
3499        }
3500        SqlExpr::BinaryOp { left, op, right } => match op {
3501            BinaryOperator::And => Ok(llkv_expr::expr::Expr::And(vec![
3502                translate_condition_with_context(resolver, context, left)?,
3503                translate_condition_with_context(resolver, context, right)?,
3504            ])),
3505            BinaryOperator::Or => Ok(llkv_expr::expr::Expr::Or(vec![
3506                translate_condition_with_context(resolver, context, left)?,
3507                translate_condition_with_context(resolver, context, right)?,
3508            ])),
3509            BinaryOperator::Eq
3510            | BinaryOperator::NotEq
3511            | BinaryOperator::Lt
3512            | BinaryOperator::LtEq
3513            | BinaryOperator::Gt
3514            | BinaryOperator::GtEq => {
3515                translate_comparison_with_context(resolver, context, left, op.clone(), right)
3516            }
3517            other => Err(Error::InvalidArgumentError(format!(
3518                "unsupported binary operator in WHERE clause: {other:?}"
3519            ))),
3520        },
3521        SqlExpr::UnaryOp {
3522            op: UnaryOperator::Not,
3523            expr,
3524        } => Ok(llkv_expr::expr::Expr::not(
3525            translate_condition_with_context(resolver, context, expr)?,
3526        )),
3527        SqlExpr::Nested(inner) => translate_condition_with_context(resolver, context, inner),
3528        other => Err(Error::InvalidArgumentError(format!(
3529            "unsupported WHERE clause: {other:?}"
3530        ))),
3531    }
3532}
3533
3534fn translate_comparison_with_context(
3535    resolver: &IdentifierResolver<'_>,
3536    context: IdentifierContext,
3537    left: &SqlExpr,
3538    op: BinaryOperator,
3539    right: &SqlExpr,
3540) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
3541    let left_scalar = translate_scalar_with_context(resolver, context, left)?;
3542    let right_scalar = translate_scalar_with_context(resolver, context, right)?;
3543    let compare_op = match op {
3544        BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
3545        BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
3546        BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
3547        BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
3548        BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
3549        BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
3550        other => {
3551            return Err(Error::InvalidArgumentError(format!(
3552                "unsupported comparison operator: {other:?}"
3553            )));
3554        }
3555    };
3556
3557    if let (
3558        llkv_expr::expr::ScalarExpr::Column(column),
3559        llkv_expr::expr::ScalarExpr::Literal(literal),
3560    ) = (&left_scalar, &right_scalar)
3561        && let Some(op) = compare_op_to_filter_operator(compare_op, literal)
3562    {
3563        return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3564            field_id: column.clone(),
3565            op,
3566        }));
3567    }
3568
3569    if let (
3570        llkv_expr::expr::ScalarExpr::Literal(literal),
3571        llkv_expr::expr::ScalarExpr::Column(column),
3572    ) = (&left_scalar, &right_scalar)
3573        && let Some(flipped) = flip_compare_op(compare_op)
3574        && let Some(op) = compare_op_to_filter_operator(flipped, literal)
3575    {
3576        return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3577            field_id: column.clone(),
3578            op,
3579        }));
3580    }
3581
3582    Ok(llkv_expr::expr::Expr::Compare {
3583        left: left_scalar,
3584        op: compare_op,
3585        right: right_scalar,
3586    })
3587}
3588
3589fn compare_op_to_filter_operator(
3590    op: llkv_expr::expr::CompareOp,
3591    literal: &Literal,
3592) -> Option<llkv_expr::expr::Operator<'static>> {
3593    let lit = literal.clone();
3594    match op {
3595        llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::Operator::Equals(lit)),
3596        llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::Operator::LessThan(lit)),
3597        llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::Operator::LessThanOrEquals(lit)),
3598        llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::Operator::GreaterThan(lit)),
3599        llkv_expr::expr::CompareOp::GtEq => {
3600            Some(llkv_expr::expr::Operator::GreaterThanOrEquals(lit))
3601        }
3602        llkv_expr::expr::CompareOp::NotEq => None,
3603    }
3604}
3605
3606fn flip_compare_op(op: llkv_expr::expr::CompareOp) -> Option<llkv_expr::expr::CompareOp> {
3607    match op {
3608        llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::CompareOp::Eq),
3609        llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::CompareOp::Gt),
3610        llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::CompareOp::GtEq),
3611        llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::CompareOp::Lt),
3612        llkv_expr::expr::CompareOp::GtEq => Some(llkv_expr::expr::CompareOp::LtEq),
3613        llkv_expr::expr::CompareOp::NotEq => None,
3614    }
3615}
3616/// Translate scalar expression with knowledge of the FROM table context.
3617/// This allows us to properly distinguish schema.table.column from column.field.field.
3618fn translate_scalar_with_context(
3619    resolver: &IdentifierResolver<'_>,
3620    context: IdentifierContext,
3621    expr: &SqlExpr,
3622) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3623    match expr {
3624        SqlExpr::Identifier(ident) => {
3625            let parts = vec![ident.value.clone()];
3626            let resolution = resolver.resolve(&parts, context)?;
3627            Ok(resolution.into_scalar_expr())
3628        }
3629        SqlExpr::CompoundIdentifier(idents) => {
3630            if idents.is_empty() {
3631                return Err(Error::InvalidArgumentError(
3632                    "invalid compound identifier".into(),
3633                ));
3634            }
3635
3636            let parts: Vec<String> = idents.iter().map(|ident| ident.value.clone()).collect();
3637            let resolution = resolver.resolve(&parts, context)?;
3638            Ok(resolution.into_scalar_expr())
3639        }
3640        _ => translate_scalar(expr),
3641    }
3642}
3643
3644fn translate_scalar(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3645    match expr {
3646        SqlExpr::Identifier(ident) => Ok(llkv_expr::expr::ScalarExpr::column(ident.value.clone())),
3647        SqlExpr::CompoundIdentifier(idents) => {
3648            if idents.is_empty() {
3649                return Err(Error::InvalidArgumentError(
3650                    "invalid compound identifier".into(),
3651                ));
3652            }
3653
3654            // Without table context, treat first part as column, rest as fields
3655            let column_name = idents[0].value.clone();
3656            let mut result = llkv_expr::expr::ScalarExpr::column(column_name);
3657
3658            for part in &idents[1..] {
3659                let field_name = part.value.clone();
3660                result = llkv_expr::expr::ScalarExpr::get_field(result, field_name);
3661            }
3662
3663            Ok(result)
3664        }
3665        SqlExpr::Value(value) => literal_from_value(value),
3666        SqlExpr::BinaryOp { left, op, right } => {
3667            let left_expr = translate_scalar(left)?;
3668            let right_expr = translate_scalar(right)?;
3669            let op = match op {
3670                BinaryOperator::Plus => llkv_expr::expr::BinaryOp::Add,
3671                BinaryOperator::Minus => llkv_expr::expr::BinaryOp::Subtract,
3672                BinaryOperator::Multiply => llkv_expr::expr::BinaryOp::Multiply,
3673                BinaryOperator::Divide => llkv_expr::expr::BinaryOp::Divide,
3674                BinaryOperator::Modulo => llkv_expr::expr::BinaryOp::Modulo,
3675                other => {
3676                    return Err(Error::InvalidArgumentError(format!(
3677                        "unsupported scalar binary operator: {other:?}"
3678                    )));
3679                }
3680            };
3681            Ok(llkv_expr::expr::ScalarExpr::binary(
3682                left_expr, op, right_expr,
3683            ))
3684        }
3685        SqlExpr::UnaryOp {
3686            op: UnaryOperator::Minus,
3687            expr,
3688        } => match translate_scalar(expr)? {
3689            llkv_expr::expr::ScalarExpr::Literal(lit) => match lit {
3690                Literal::Integer(v) => {
3691                    Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(-v)))
3692                }
3693                Literal::Float(v) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(-v))),
3694                Literal::Boolean(_) => Err(Error::InvalidArgumentError(
3695                    "cannot negate boolean literal".into(),
3696                )),
3697                Literal::String(_) => Err(Error::InvalidArgumentError(
3698                    "cannot negate string literal".into(),
3699                )),
3700                Literal::Struct(_) => Err(Error::InvalidArgumentError(
3701                    "cannot negate struct literal".into(),
3702                )),
3703                Literal::Null => Err(Error::InvalidArgumentError(
3704                    "cannot negate null literal".into(),
3705                )),
3706            },
3707            _ => Err(Error::InvalidArgumentError(
3708                "cannot negate non-literal expression".into(),
3709            )),
3710        },
3711        SqlExpr::UnaryOp {
3712            op: UnaryOperator::Plus,
3713            expr,
3714        } => translate_scalar(expr),
3715        SqlExpr::Nested(inner) => translate_scalar(inner),
3716        SqlExpr::Function(func) => {
3717            // Try to parse as an aggregate function
3718            if let Some(agg_call) = try_parse_aggregate_function(func)? {
3719                Ok(llkv_expr::expr::ScalarExpr::aggregate(agg_call))
3720            } else {
3721                Err(Error::InvalidArgumentError(format!(
3722                    "unsupported function in scalar expression: {:?}",
3723                    func.name
3724                )))
3725            }
3726        }
3727        SqlExpr::Dictionary(fields) => {
3728            // Dictionary literal like {'a': 1, 'b': 2} represents a STRUCT literal
3729            let mut struct_fields = Vec::new();
3730            for entry in fields {
3731                let key = entry.key.value.clone(); // Use .value to get the actual identifier value
3732                // Parse the value to a literal
3733                let value_expr = translate_scalar(&entry.value)?;
3734                // Extract the literal from the expression
3735                match value_expr {
3736                    llkv_expr::expr::ScalarExpr::Literal(lit) => {
3737                        struct_fields.push((key, Box::new(lit)));
3738                    }
3739                    _ => {
3740                        return Err(Error::InvalidArgumentError(
3741                            "Dictionary values must be literals".to_string(),
3742                        ));
3743                    }
3744                }
3745            }
3746            Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Struct(
3747                struct_fields,
3748            )))
3749        }
3750        other => Err(Error::InvalidArgumentError(format!(
3751            "unsupported scalar expression: {other:?}"
3752        ))),
3753    }
3754}
3755
3756fn literal_from_value(value: &ValueWithSpan) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3757    match &value.value {
3758        Value::Number(text, _) => {
3759            if text.contains(['.', 'e', 'E']) {
3760                let parsed = text.parse::<f64>().map_err(|err| {
3761                    Error::InvalidArgumentError(format!("invalid float literal: {err}"))
3762                })?;
3763                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(parsed)))
3764            } else {
3765                let parsed = text.parse::<i128>().map_err(|err| {
3766                    Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
3767                })?;
3768                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(
3769                    parsed,
3770                )))
3771            }
3772        }
3773        Value::Boolean(value) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Boolean(
3774            *value,
3775        ))),
3776        Value::Null => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Null)),
3777        other => {
3778            if let Some(text) = other.clone().into_string() {
3779                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::String(text)))
3780            } else {
3781                Err(Error::InvalidArgumentError(format!(
3782                    "unsupported literal: {other:?}"
3783                )))
3784            }
3785        }
3786    }
3787}
3788
3789fn resolve_assignment_column_name(target: &AssignmentTarget) -> SqlResult<String> {
3790    match target {
3791        AssignmentTarget::ColumnName(name) => {
3792            if name.0.len() != 1 {
3793                return Err(Error::InvalidArgumentError(
3794                    "qualified column names in UPDATE assignments are not supported yet".into(),
3795                ));
3796            }
3797            match &name.0[0] {
3798                ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
3799                other => Err(Error::InvalidArgumentError(format!(
3800                    "unsupported column reference in UPDATE assignment: {other:?}"
3801                ))),
3802            }
3803        }
3804        AssignmentTarget::Tuple(_) => Err(Error::InvalidArgumentError(
3805            "tuple assignments are not supported yet".into(),
3806        )),
3807    }
3808}
3809
3810fn arrow_type_from_sql(data_type: &SqlDataType) -> SqlResult<arrow::datatypes::DataType> {
3811    use arrow::datatypes::DataType;
3812    match data_type {
3813        SqlDataType::Int(_)
3814        | SqlDataType::Integer(_)
3815        | SqlDataType::BigInt(_)
3816        | SqlDataType::SmallInt(_)
3817        | SqlDataType::TinyInt(_) => Ok(DataType::Int64),
3818        SqlDataType::Float(_)
3819        | SqlDataType::Real
3820        | SqlDataType::Double(_)
3821        | SqlDataType::DoublePrecision => Ok(DataType::Float64),
3822        SqlDataType::Text
3823        | SqlDataType::String(_)
3824        | SqlDataType::Varchar(_)
3825        | SqlDataType::Char(_)
3826        | SqlDataType::Uuid => Ok(DataType::Utf8),
3827        SqlDataType::Date => Ok(DataType::Date32),
3828        SqlDataType::Decimal(_) | SqlDataType::Numeric(_) => Ok(DataType::Float64),
3829        SqlDataType::Boolean => Ok(DataType::Boolean),
3830        SqlDataType::Custom(name, args) => {
3831            if name.0.len() == 1
3832                && let ObjectNamePart::Identifier(ident) = &name.0[0]
3833                && ident.value.eq_ignore_ascii_case("row")
3834            {
3835                return row_type_to_arrow(data_type, args);
3836            }
3837            Err(Error::InvalidArgumentError(format!(
3838                "unsupported SQL data type: {data_type:?}"
3839            )))
3840        }
3841        other => Err(Error::InvalidArgumentError(format!(
3842            "unsupported SQL data type: {other:?}"
3843        ))),
3844    }
3845}
3846
3847fn row_type_to_arrow(
3848    data_type: &SqlDataType,
3849    tokens: &[String],
3850) -> SqlResult<arrow::datatypes::DataType> {
3851    use arrow::datatypes::{DataType, Field, FieldRef, Fields};
3852
3853    let row_str = data_type.to_string();
3854    if tokens.is_empty() {
3855        return Err(Error::InvalidArgumentError(
3856            "ROW type must define at least one field".into(),
3857        ));
3858    }
3859
3860    let dialect = GenericDialect {};
3861    let field_definitions = resolve_row_field_types(tokens, &dialect).map_err(|err| {
3862        Error::InvalidArgumentError(format!("unable to parse ROW type '{row_str}': {err}"))
3863    })?;
3864
3865    let mut fields: Vec<FieldRef> = Vec::with_capacity(field_definitions.len());
3866    for (field_name, field_type) in field_definitions {
3867        let arrow_field_type = arrow_type_from_sql(&field_type)?;
3868        fields.push(Arc::new(Field::new(field_name, arrow_field_type, true)));
3869    }
3870
3871    let struct_fields: Fields = fields.into();
3872    Ok(DataType::Struct(struct_fields))
3873}
3874
3875fn resolve_row_field_types(
3876    tokens: &[String],
3877    dialect: &GenericDialect,
3878) -> SqlResult<Vec<(String, SqlDataType)>> {
3879    if tokens.is_empty() {
3880        return Err(Error::InvalidArgumentError(
3881            "ROW type must define at least one field".into(),
3882        ));
3883    }
3884
3885    let mut start = 0;
3886    let mut end = tokens.len();
3887    if tokens[start] == "(" {
3888        if end == 0 || tokens[end - 1] != ")" {
3889            return Err(Error::InvalidArgumentError(
3890                "ROW type is missing closing ')'".into(),
3891            ));
3892        }
3893        start += 1;
3894        end -= 1;
3895    } else if tokens[end - 1] == ")" {
3896        return Err(Error::InvalidArgumentError(
3897            "ROW type contains unmatched ')'".into(),
3898        ));
3899    }
3900
3901    let slice = &tokens[start..end];
3902    if slice.is_empty() {
3903        return Err(Error::InvalidArgumentError(
3904            "ROW type did not provide any field definitions".into(),
3905        ));
3906    }
3907
3908    let mut fields = Vec::new();
3909    let mut index = 0;
3910
3911    while index < slice.len() {
3912        if slice[index] == "," {
3913            index += 1;
3914            continue;
3915        }
3916
3917        let field_name = normalize_row_field_name(&slice[index])?;
3918        index += 1;
3919
3920        if index >= slice.len() {
3921            return Err(Error::InvalidArgumentError(format!(
3922                "ROW field '{field_name}' is missing a type specification"
3923            )));
3924        }
3925
3926        let mut last_success: Option<(usize, SqlDataType)> = None;
3927        let mut type_end = index;
3928
3929        while type_end <= slice.len() {
3930            let candidate = slice[index..type_end].join(" ");
3931            if candidate.trim().is_empty() {
3932                type_end += 1;
3933                continue;
3934            }
3935
3936            if let Ok(parsed_type) = parse_sql_data_type(&candidate, dialect) {
3937                last_success = Some((type_end, parsed_type));
3938            }
3939
3940            if type_end == slice.len() {
3941                break;
3942            }
3943
3944            if slice[type_end] == "," && last_success.is_some() {
3945                break;
3946            }
3947
3948            type_end += 1;
3949        }
3950
3951        let Some((next_index, data_type)) = last_success else {
3952            return Err(Error::InvalidArgumentError(format!(
3953                "failed to parse ROW field type for '{field_name}'"
3954            )));
3955        };
3956
3957        fields.push((field_name, data_type));
3958        index = next_index;
3959
3960        if index < slice.len() && slice[index] == "," {
3961            index += 1;
3962        }
3963    }
3964
3965    if fields.is_empty() {
3966        return Err(Error::InvalidArgumentError(
3967            "ROW type did not provide any field definitions".into(),
3968        ));
3969    }
3970
3971    Ok(fields)
3972}
3973
3974fn normalize_row_field_name(raw: &str) -> SqlResult<String> {
3975    let trimmed = raw.trim();
3976    if trimmed.is_empty() {
3977        return Err(Error::InvalidArgumentError(
3978            "ROW field name must not be empty".into(),
3979        ));
3980    }
3981
3982    if let Some(stripped) = trimmed.strip_prefix('"') {
3983        let without_end = stripped.strip_suffix('"').ok_or_else(|| {
3984            Error::InvalidArgumentError(format!("unterminated quoted ROW field name: {trimmed}"))
3985        })?;
3986        let name = without_end.replace("\"\"", "\"");
3987        return Ok(name);
3988    }
3989
3990    Ok(trimmed.to_string())
3991}
3992
3993fn parse_sql_data_type(type_str: &str, dialect: &GenericDialect) -> SqlResult<SqlDataType> {
3994    let trimmed = type_str.trim();
3995    let sql = format!("CREATE TABLE __row(__field {trimmed});");
3996    let statements = Parser::parse_sql(dialect, &sql).map_err(|err| {
3997        Error::InvalidArgumentError(format!("failed to parse ROW field type '{trimmed}': {err}"))
3998    })?;
3999
4000    let stmt = statements.into_iter().next().ok_or_else(|| {
4001        Error::InvalidArgumentError(format!(
4002            "ROW field type '{trimmed}' did not produce a statement"
4003        ))
4004    })?;
4005
4006    match stmt {
4007        Statement::CreateTable(table) => table
4008            .columns
4009            .first()
4010            .map(|col| col.data_type.clone())
4011            .ok_or_else(|| {
4012                Error::InvalidArgumentError(format!(
4013                    "ROW field type '{trimmed}' missing column definition"
4014                ))
4015            }),
4016        other => Err(Error::InvalidArgumentError(format!(
4017            "unexpected statement while parsing ROW field type: {other:?}"
4018        ))),
4019    }
4020}
4021
4022fn extract_constant_select_rows(select: &Select) -> SqlResult<Option<Vec<Vec<PlanValue>>>> {
4023    if !select.from.is_empty() {
4024        return Ok(None);
4025    }
4026
4027    if select.selection.is_some()
4028        || select.having.is_some()
4029        || !select.named_window.is_empty()
4030        || select.qualify.is_some()
4031        || select.distinct.is_some()
4032        || select.top.is_some()
4033        || select.into.is_some()
4034        || select.prewhere.is_some()
4035        || !select.lateral_views.is_empty()
4036        || select.value_table_mode.is_some()
4037        || !group_by_is_empty(&select.group_by)
4038    {
4039        return Err(Error::InvalidArgumentError(
4040            "constant SELECT statements do not support advanced clauses".into(),
4041        ));
4042    }
4043
4044    if select.projection.is_empty() {
4045        return Err(Error::InvalidArgumentError(
4046            "constant SELECT requires at least one projection".into(),
4047        ));
4048    }
4049
4050    let mut row: Vec<PlanValue> = Vec::with_capacity(select.projection.len());
4051    for item in &select.projection {
4052        let expr = match item {
4053            SelectItem::UnnamedExpr(expr) => expr,
4054            SelectItem::ExprWithAlias { expr, .. } => expr,
4055            other => {
4056                return Err(Error::InvalidArgumentError(format!(
4057                    "unsupported projection in constant SELECT: {other:?}"
4058                )));
4059            }
4060        };
4061
4062        let value = SqlValue::try_from_expr(expr)?;
4063        row.push(PlanValue::from(value));
4064    }
4065
4066    Ok(Some(vec![row]))
4067}
4068
4069fn extract_single_table(from: &[TableWithJoins]) -> SqlResult<(String, String)> {
4070    if from.len() != 1 {
4071        return Err(Error::InvalidArgumentError(
4072            "queries over multiple tables are not supported yet".into(),
4073        ));
4074    }
4075    let item = &from[0];
4076    if !item.joins.is_empty() {
4077        return Err(Error::InvalidArgumentError(
4078            "JOIN clauses are not supported yet".into(),
4079        ));
4080    }
4081    match &item.relation {
4082        TableFactor::Table { name, .. } => canonical_object_name(name),
4083        _ => Err(Error::InvalidArgumentError(
4084            "queries require a plain table name".into(),
4085        )),
4086    }
4087}
4088
4089/// Extract multiple tables from FROM clause for cross products
4090fn extract_tables(from: &[TableWithJoins]) -> SqlResult<Vec<llkv_plan::TableRef>> {
4091    let mut tables = Vec::new();
4092
4093    for item in from {
4094        // Check if this table has joins - we don't support JOIN yet
4095        if !item.joins.is_empty() {
4096            return Err(Error::InvalidArgumentError(
4097                "JOIN clauses are not supported yet".into(),
4098            ));
4099        }
4100
4101        // Extract table name
4102        match &item.relation {
4103            TableFactor::Table { name, .. } => {
4104                let (schema_opt, table) = parse_schema_qualified_name(name)?;
4105                let schema = schema_opt.unwrap_or_default();
4106                tables.push(llkv_plan::TableRef::new(schema, table));
4107            }
4108            _ => {
4109                return Err(Error::InvalidArgumentError(
4110                    "queries require a plain table name".into(),
4111                ));
4112            }
4113        }
4114    }
4115
4116    Ok(tables)
4117}
4118
4119fn group_by_is_empty(expr: &GroupByExpr) -> bool {
4120    matches!(
4121        expr,
4122        GroupByExpr::Expressions(exprs, modifiers)
4123            if exprs.is_empty() && modifiers.is_empty()
4124    )
4125}
4126
4127#[cfg(test)]
4128mod tests {
4129    use super::*;
4130    use arrow::array::{Array, Int64Array, StringArray};
4131    use arrow::record_batch::RecordBatch;
4132    use llkv_storage::pager::MemPager;
4133
4134    fn extract_string_options(batches: &[RecordBatch]) -> Vec<Option<String>> {
4135        let mut values: Vec<Option<String>> = Vec::new();
4136        for batch in batches {
4137            let column = batch
4138                .column(0)
4139                .as_any()
4140                .downcast_ref::<StringArray>()
4141                .expect("string column");
4142            for idx in 0..column.len() {
4143                if column.is_null(idx) {
4144                    values.push(None);
4145                } else {
4146                    values.push(Some(column.value(idx).to_string()));
4147                }
4148            }
4149        }
4150        values
4151    }
4152
4153    #[test]
4154    fn create_insert_select_roundtrip() {
4155        let pager = Arc::new(MemPager::default());
4156        let engine = SqlEngine::new(pager);
4157
4158        let result = engine
4159            .execute("CREATE TABLE people (id INT NOT NULL, name TEXT NOT NULL)")
4160            .expect("create table");
4161        assert!(matches!(
4162            result[0],
4163            RuntimeStatementResult::CreateTable { .. }
4164        ));
4165
4166        let result = engine
4167            .execute("INSERT INTO people (id, name) VALUES (1, 'alice'), (2, 'bob')")
4168            .expect("insert rows");
4169        assert!(matches!(
4170            result[0],
4171            RuntimeStatementResult::Insert {
4172                rows_inserted: 2,
4173                ..
4174            }
4175        ));
4176
4177        let mut result = engine
4178            .execute("SELECT name FROM people WHERE id = 2")
4179            .expect("select rows");
4180        let select_result = result.remove(0);
4181        let batches = match select_result {
4182            RuntimeStatementResult::Select { execution, .. } => {
4183                execution.collect().expect("collect batches")
4184            }
4185            _ => panic!("expected select result"),
4186        };
4187        assert_eq!(batches.len(), 1);
4188        let column = batches[0]
4189            .column(0)
4190            .as_any()
4191            .downcast_ref::<StringArray>()
4192            .expect("string column");
4193        assert_eq!(column.len(), 1);
4194        assert_eq!(column.value(0), "bob");
4195    }
4196
4197    #[test]
4198    fn insert_select_constant_including_null() {
4199        let pager = Arc::new(MemPager::default());
4200        let engine = SqlEngine::new(pager);
4201
4202        engine
4203            .execute("CREATE TABLE integers(i INTEGER)")
4204            .expect("create table");
4205
4206        let result = engine
4207            .execute("INSERT INTO integers SELECT 42")
4208            .expect("insert literal");
4209        assert!(matches!(
4210            result[0],
4211            RuntimeStatementResult::Insert {
4212                rows_inserted: 1,
4213                ..
4214            }
4215        ));
4216
4217        let result = engine
4218            .execute("INSERT INTO integers SELECT CAST(NULL AS VARCHAR)")
4219            .expect("insert null literal");
4220        assert!(matches!(
4221            result[0],
4222            RuntimeStatementResult::Insert {
4223                rows_inserted: 1,
4224                ..
4225            }
4226        ));
4227
4228        let mut result = engine
4229            .execute("SELECT * FROM integers")
4230            .expect("select rows");
4231        let select_result = result.remove(0);
4232        let batches = match select_result {
4233            RuntimeStatementResult::Select { execution, .. } => {
4234                execution.collect().expect("collect batches")
4235            }
4236            _ => panic!("expected select result"),
4237        };
4238
4239        let mut values: Vec<Option<i64>> = Vec::new();
4240        for batch in &batches {
4241            let column = batch
4242                .column(0)
4243                .as_any()
4244                .downcast_ref::<Int64Array>()
4245                .expect("int column");
4246            for idx in 0..column.len() {
4247                if column.is_null(idx) {
4248                    values.push(None);
4249                } else {
4250                    values.push(Some(column.value(idx)));
4251                }
4252            }
4253        }
4254
4255        assert_eq!(values, vec![Some(42), None]);
4256    }
4257
4258    #[test]
4259    fn update_with_where_clause_filters_rows() {
4260        let pager = Arc::new(MemPager::default());
4261        let engine = SqlEngine::new(pager);
4262
4263        engine
4264            .execute("SET default_null_order='nulls_first'")
4265            .expect("set default null order");
4266
4267        engine
4268            .execute("CREATE TABLE strings(a VARCHAR)")
4269            .expect("create table");
4270
4271        engine
4272            .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
4273            .expect("insert seed rows");
4274
4275        let result = engine
4276            .execute("UPDATE strings SET a = 13 WHERE a = '3'")
4277            .expect("update rows");
4278        assert!(matches!(
4279            result[0],
4280            RuntimeStatementResult::Update {
4281                rows_updated: 1,
4282                ..
4283            }
4284        ));
4285
4286        let mut result = engine
4287            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4288            .expect("select rows");
4289        let select_result = result.remove(0);
4290        let batches = match select_result {
4291            RuntimeStatementResult::Select { execution, .. } => {
4292                execution.collect().expect("collect batches")
4293            }
4294            _ => panic!("expected select result"),
4295        };
4296
4297        let mut values: Vec<Option<String>> = Vec::new();
4298        for batch in &batches {
4299            let column = batch
4300                .column(0)
4301                .as_any()
4302                .downcast_ref::<StringArray>()
4303                .expect("string column");
4304            for idx in 0..column.len() {
4305                if column.is_null(idx) {
4306                    values.push(None);
4307                } else {
4308                    values.push(Some(column.value(idx).to_string()));
4309                }
4310            }
4311        }
4312
4313        values.sort_by(|a, b| match (a, b) {
4314            (None, None) => std::cmp::Ordering::Equal,
4315            (None, Some(_)) => std::cmp::Ordering::Less,
4316            (Some(_), None) => std::cmp::Ordering::Greater,
4317            (Some(av), Some(bv)) => {
4318                let a_val = av.parse::<i64>().unwrap_or_default();
4319                let b_val = bv.parse::<i64>().unwrap_or_default();
4320                a_val.cmp(&b_val)
4321            }
4322        });
4323
4324        assert_eq!(
4325            values,
4326            vec![None, Some("4".to_string()), Some("13".to_string())]
4327        );
4328    }
4329
4330    #[test]
4331    fn order_by_honors_configured_default_null_order() {
4332        let pager = Arc::new(MemPager::default());
4333        let engine = SqlEngine::new(pager);
4334
4335        engine
4336            .execute("CREATE TABLE strings(a VARCHAR)")
4337            .expect("create table");
4338        engine
4339            .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
4340            .expect("insert values");
4341        engine
4342            .execute("UPDATE strings SET a = 13 WHERE a = '3'")
4343            .expect("update value");
4344
4345        let mut result = engine
4346            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4347            .expect("select rows");
4348        let select_result = result.remove(0);
4349        let batches = match select_result {
4350            RuntimeStatementResult::Select { execution, .. } => {
4351                execution.collect().expect("collect batches")
4352            }
4353            _ => panic!("expected select result"),
4354        };
4355
4356        let values = extract_string_options(&batches);
4357        assert_eq!(
4358            values,
4359            vec![Some("4".to_string()), Some("13".to_string()), None]
4360        );
4361
4362        assert!(!engine.default_nulls_first_for_tests());
4363
4364        engine
4365            .execute("SET default_null_order='nulls_first'")
4366            .expect("set default null order");
4367
4368        assert!(engine.default_nulls_first_for_tests());
4369
4370        let mut result = engine
4371            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4372            .expect("select rows");
4373        let select_result = result.remove(0);
4374        let batches = match select_result {
4375            RuntimeStatementResult::Select { execution, .. } => {
4376                execution.collect().expect("collect batches")
4377            }
4378            _ => panic!("expected select result"),
4379        };
4380
4381        let values = extract_string_options(&batches);
4382        assert_eq!(
4383            values,
4384            vec![None, Some("4".to_string()), Some("13".to_string())]
4385        );
4386    }
4387
4388    #[test]
4389    fn arrow_type_from_row_returns_struct_fields() {
4390        let dialect = GenericDialect {};
4391        let statements = Parser::parse_sql(
4392            &dialect,
4393            "CREATE TABLE row_types(payload ROW(a INTEGER, b VARCHAR));",
4394        )
4395        .expect("parse ROW type definition");
4396
4397        let data_type = match &statements[0] {
4398            Statement::CreateTable(stmt) => stmt.columns[0].data_type.clone(),
4399            other => panic!("unexpected statement: {other:?}"),
4400        };
4401
4402        let arrow_type = arrow_type_from_sql(&data_type).expect("convert ROW type");
4403        match arrow_type {
4404            arrow::datatypes::DataType::Struct(fields) => {
4405                assert_eq!(fields.len(), 2, "unexpected field count");
4406                assert_eq!(fields[0].name(), "a");
4407                assert_eq!(fields[1].name(), "b");
4408                assert_eq!(fields[0].data_type(), &arrow::datatypes::DataType::Int64);
4409                assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
4410            }
4411            other => panic!("expected struct type, got {other:?}"),
4412        }
4413    }
4414}