llkv_sql/
sql_engine.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
4
5use crate::SqlResult;
6use crate::SqlValue;
7
8use llkv_expr::literal::Literal;
9use llkv_result::Error;
10use llkv_runtime::{
11    AggregateExpr, AssignmentValue, ColumnAssignment, ColumnSpec, CreateTablePlan,
12    CreateTableSource, DeletePlan, InsertPlan, InsertSource, OrderByPlan, OrderSortType,
13    OrderTarget, PlanStatement, PlanValue, RuntimeContext, RuntimeEngine, RuntimeSession,
14    RuntimeStatementResult, SelectPlan, SelectProjection, UpdatePlan, extract_rows_from_range,
15};
16use llkv_storage::pager::Pager;
17use simd_r_drive_entry_handle::EntryHandle;
18use sqlparser::ast::{
19    Assignment, AssignmentTarget, BeginTransactionKind, BinaryOperator, ColumnOption,
20    ColumnOptionDef, DataType as SqlDataType, Delete, ExceptionWhen, Expr as SqlExpr, FromTable,
21    FunctionArg, FunctionArgExpr, FunctionArguments, GroupByExpr, Ident, LimitClause, ObjectName,
22    ObjectNamePart, ObjectType, OrderBy, OrderByExpr, OrderByKind, Query, Select, SelectItem,
23    SelectItemQualifiedWildcardKind, Set, SetExpr, Statement, TableFactor, TableObject,
24    TableWithJoins, TransactionMode, TransactionModifier, UnaryOperator, UpdateTableFromKind,
25    Value, ValueWithSpan,
26};
27use sqlparser::dialect::GenericDialect;
28use sqlparser::parser::Parser;
29
30pub struct SqlEngine<P>
31where
32    P: Pager<Blob = EntryHandle> + Send + Sync,
33{
34    engine: RuntimeEngine<P>,
35    default_nulls_first: AtomicBool,
36}
37
38const DROPPED_TABLE_TRANSACTION_ERR: &str = "another transaction has dropped this table";
39
40impl<P> Clone for SqlEngine<P>
41where
42    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
43{
44    fn clone(&self) -> Self {
45        tracing::warn!(
46            "[SQL_ENGINE] SqlEngine::clone() called - will create new Engine with new session!"
47        );
48        // Create a new session from the same context
49        Self {
50            engine: self.engine.clone(),
51            default_nulls_first: AtomicBool::new(
52                self.default_nulls_first.load(AtomicOrdering::Relaxed),
53            ),
54        }
55    }
56}
57
58#[allow(dead_code)]
59impl<P> SqlEngine<P>
60where
61    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
62{
63    fn map_table_error(table_name: &str, err: Error) -> Error {
64        match err {
65            Error::NotFound => Self::table_not_found_error(table_name),
66            Error::InvalidArgumentError(msg) if msg.contains("unknown table") => {
67                Self::table_not_found_error(table_name)
68            }
69            other => other,
70        }
71    }
72
73    fn table_not_found_error(table_name: &str) -> Error {
74        Error::CatalogError(format!(
75            "Catalog Error: Table '{table_name}' does not exist"
76        ))
77    }
78
79    fn is_table_missing_error(err: &Error) -> bool {
80        match err {
81            Error::NotFound => true,
82            Error::CatalogError(msg) => {
83                msg.contains("Catalog Error: Table") || msg.contains("unknown table")
84            }
85            Error::InvalidArgumentError(msg) => {
86                msg.contains("Catalog Error: Table") || msg.contains("unknown table")
87            }
88            _ => false,
89        }
90    }
91
92    // `statement_table_name` is provided by llkv-runtime; use it to avoid
93    // duplicating plan-level logic here.
94
95    fn execute_plan_statement(
96        &self,
97        statement: PlanStatement,
98    ) -> SqlResult<RuntimeStatementResult<P>> {
99        let table = llkv_runtime::statement_table_name(&statement).map(str::to_string);
100        self.engine.execute_statement(statement).map_err(|err| {
101            if let Some(table_name) = table {
102                Self::map_table_error(&table_name, err)
103            } else {
104                err
105            }
106        })
107    }
108
109    pub fn new(pager: Arc<P>) -> Self {
110        let engine = RuntimeEngine::new(pager);
111        Self {
112            engine,
113            default_nulls_first: AtomicBool::new(false),
114        }
115    }
116
117    pub(crate) fn context_arc(&self) -> Arc<RuntimeContext<P>> {
118        self.engine.context()
119    }
120
121    pub fn with_context(context: Arc<RuntimeContext<P>>, default_nulls_first: bool) -> Self {
122        Self {
123            engine: RuntimeEngine::from_context(context),
124            default_nulls_first: AtomicBool::new(default_nulls_first),
125        }
126    }
127
128    #[cfg(test)]
129    fn default_nulls_first_for_tests(&self) -> bool {
130        self.default_nulls_first.load(AtomicOrdering::Relaxed)
131    }
132
133    fn has_active_transaction(&self) -> bool {
134        self.engine.session().has_active_transaction()
135    }
136
137    /// Get a reference to the underlying session (for advanced use like error handling in test harnesses).
138    pub fn session(&self) -> &RuntimeSession<P> {
139        self.engine.session()
140    }
141
142    pub fn execute(&self, sql: &str) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
143        tracing::trace!("DEBUG SQL execute: {}", sql);
144        let dialect = GenericDialect {};
145        let statements = Parser::parse_sql(&dialect, sql)
146            .map_err(|err| Error::InvalidArgumentError(format!("failed to parse SQL: {err}")))?;
147        tracing::trace!("DEBUG SQL execute: parsed {} statements", statements.len());
148
149        let mut results = Vec::with_capacity(statements.len());
150        for (i, statement) in statements.iter().enumerate() {
151            tracing::trace!("DEBUG SQL execute: processing statement {}", i);
152            results.push(self.execute_statement(statement.clone())?);
153            tracing::trace!("DEBUG SQL execute: statement {} completed", i);
154        }
155        tracing::trace!("DEBUG SQL execute completed successfully");
156        Ok(results)
157    }
158
159    fn execute_statement(&self, statement: Statement) -> SqlResult<RuntimeStatementResult<P>> {
160        tracing::trace!(
161            "DEBUG SQL execute_statement: {:?}",
162            match &statement {
163                Statement::Insert(insert) =>
164                    format!("Insert(table={:?})", Self::table_name_from_insert(insert)),
165                Statement::Query(_) => "Query".to_string(),
166                Statement::StartTransaction { .. } => "StartTransaction".to_string(),
167                Statement::Commit { .. } => "Commit".to_string(),
168                Statement::Rollback { .. } => "Rollback".to_string(),
169                Statement::CreateTable(_) => "CreateTable".to_string(),
170                Statement::Update { .. } => "Update".to_string(),
171                Statement::Delete(_) => "Delete".to_string(),
172                other => format!("Other({:?})", other),
173            }
174        );
175        match statement {
176            Statement::StartTransaction {
177                modes,
178                begin,
179                transaction,
180                modifier,
181                statements,
182                exception,
183                has_end_keyword,
184            } => self.handle_start_transaction(
185                modes,
186                begin,
187                transaction,
188                modifier,
189                statements,
190                exception,
191                has_end_keyword,
192            ),
193            Statement::Commit {
194                chain,
195                end,
196                modifier,
197            } => self.handle_commit(chain, end, modifier),
198            Statement::Rollback { chain, savepoint } => self.handle_rollback(chain, savepoint),
199            other => self.execute_statement_non_transactional(other),
200        }
201    }
202
203    fn execute_statement_non_transactional(
204        &self,
205        statement: Statement,
206    ) -> SqlResult<RuntimeStatementResult<P>> {
207        tracing::trace!("DEBUG SQL execute_statement_non_transactional called");
208        match statement {
209            Statement::CreateTable(stmt) => {
210                tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateTable");
211                self.handle_create_table(stmt)
212            }
213            Statement::Insert(stmt) => {
214                let table_name =
215                    Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
216                tracing::trace!(
217                    "DEBUG SQL execute_statement_non_transactional: Insert(table={})",
218                    table_name
219                );
220                self.handle_insert(stmt)
221            }
222            Statement::Query(query) => {
223                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Query");
224                self.handle_query(*query)
225            }
226            Statement::Update {
227                table,
228                assignments,
229                from,
230                selection,
231                returning,
232                ..
233            } => {
234                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Update");
235                self.handle_update(table, assignments, from, selection, returning)
236            }
237            Statement::Delete(delete) => {
238                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Delete");
239                self.handle_delete(delete)
240            }
241            Statement::Drop {
242                object_type,
243                if_exists,
244                names,
245                cascade,
246                restrict,
247                purge,
248                temporary,
249                ..
250            } => {
251                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Drop");
252                self.handle_drop(
253                    object_type,
254                    if_exists,
255                    names,
256                    cascade,
257                    restrict,
258                    purge,
259                    temporary,
260                )
261            }
262            Statement::Set(set_stmt) => {
263                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Set");
264                self.handle_set(set_stmt)
265            }
266            Statement::Pragma { name, value, is_eq } => {
267                tracing::trace!("DEBUG SQL execute_statement_non_transactional: Pragma");
268                self.handle_pragma(name, value, is_eq)
269            }
270            other => {
271                tracing::trace!(
272                    "DEBUG SQL execute_statement_non_transactional: Other({:?})",
273                    other
274                );
275                Err(Error::InvalidArgumentError(format!(
276                    "unsupported SQL statement: {other:?}"
277                )))
278            }
279        }
280    }
281
282    fn table_name_from_insert(insert: &sqlparser::ast::Insert) -> SqlResult<String> {
283        match &insert.table {
284            TableObject::TableName(name) => Self::object_name_to_string(name),
285            _ => Err(Error::InvalidArgumentError(
286                "INSERT requires a plain table name".into(),
287            )),
288        }
289    }
290
291    fn table_name_from_update(table: &TableWithJoins) -> SqlResult<Option<String>> {
292        if !table.joins.is_empty() {
293            return Err(Error::InvalidArgumentError(
294                "UPDATE with JOIN targets is not supported yet".into(),
295            ));
296        }
297        Self::table_with_joins_name(table)
298    }
299
300    fn table_name_from_delete(delete: &Delete) -> SqlResult<Option<String>> {
301        if !delete.tables.is_empty() {
302            return Err(Error::InvalidArgumentError(
303                "multi-table DELETE is not supported yet".into(),
304            ));
305        }
306        let from_tables = match &delete.from {
307            FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
308        };
309        if from_tables.is_empty() {
310            return Ok(None);
311        }
312        if from_tables.len() != 1 {
313            return Err(Error::InvalidArgumentError(
314                "DELETE over multiple tables is not supported yet".into(),
315            ));
316        }
317        Self::table_with_joins_name(&from_tables[0])
318    }
319
320    fn object_name_to_string(name: &ObjectName) -> SqlResult<String> {
321        let (display, _) = canonical_object_name(name)?;
322        Ok(display)
323    }
324
325    #[allow(dead_code)]
326    fn table_object_to_name(table: &TableObject) -> SqlResult<Option<String>> {
327        match table {
328            TableObject::TableName(name) => Ok(Some(Self::object_name_to_string(name)?)),
329            TableObject::TableFunction(_) => Ok(None),
330        }
331    }
332
333    fn table_with_joins_name(table: &TableWithJoins) -> SqlResult<Option<String>> {
334        match &table.relation {
335            TableFactor::Table { name, .. } => Ok(Some(Self::object_name_to_string(name)?)),
336            _ => Ok(None),
337        }
338    }
339
340    fn tables_in_query(query: &Query) -> SqlResult<Vec<String>> {
341        let mut tables = Vec::new();
342        if let sqlparser::ast::SetExpr::Select(select) = query.body.as_ref() {
343            for table in &select.from {
344                if let TableFactor::Table { name, .. } = &table.relation {
345                    tables.push(Self::object_name_to_string(name)?);
346                }
347            }
348        }
349        Ok(tables)
350    }
351
352    fn is_table_marked_dropped(&self, table_name: &str) -> SqlResult<bool> {
353        let canonical = table_name.to_ascii_lowercase();
354        Ok(self.engine.context().is_table_marked_dropped(&canonical))
355    }
356
357    fn handle_create_table(
358        &self,
359        mut stmt: sqlparser::ast::CreateTable,
360    ) -> SqlResult<RuntimeStatementResult<P>> {
361        validate_create_table_common(&stmt)?;
362
363        let (display_name, canonical_name) = canonical_object_name(&stmt.name)?;
364        tracing::trace!(
365            "\n=== HANDLE_CREATE_TABLE: table='{}' columns={} ===",
366            display_name,
367            stmt.columns.len()
368        );
369        if display_name.is_empty() {
370            return Err(Error::InvalidArgumentError(
371                "table name must not be empty".into(),
372            ));
373        }
374
375        if let Some(query) = stmt.query.take() {
376            validate_create_table_as(&stmt)?;
377            if let Some(result) = self.try_handle_range_ctas(
378                &display_name,
379                &canonical_name,
380                &query,
381                stmt.if_not_exists,
382                stmt.or_replace,
383            )? {
384                return Ok(result);
385            }
386            return self.handle_create_table_as(
387                display_name,
388                canonical_name,
389                *query,
390                stmt.if_not_exists,
391                stmt.or_replace,
392            );
393        }
394
395        if stmt.columns.is_empty() {
396            return Err(Error::InvalidArgumentError(
397                "CREATE TABLE requires at least one column".into(),
398            ));
399        }
400
401        validate_create_table_definition(&stmt)?;
402
403        let mut columns: Vec<ColumnSpec> = Vec::with_capacity(stmt.columns.len());
404        let mut names: HashMap<String, ()> = HashMap::new();
405        for column_def in stmt.columns {
406            let is_nullable = column_def
407                .options
408                .iter()
409                .all(|opt| !matches!(opt.option, ColumnOption::NotNull));
410
411            let is_primary_key = column_def.options.iter().any(|opt| {
412                matches!(
413                    opt.option,
414                    ColumnOption::Unique {
415                        is_primary: true,
416                        characteristics: _
417                    }
418                )
419            });
420
421            tracing::trace!(
422                "DEBUG CREATE TABLE column '{}' is_primary_key={}",
423                column_def.name.value,
424                is_primary_key
425            );
426
427            let mut column = ColumnSpec::new(
428                column_def.name.value.clone(),
429                arrow_type_from_sql(&column_def.data_type)?,
430                is_nullable,
431            );
432            tracing::trace!(
433                "DEBUG ColumnSpec after new(): primary_key={}",
434                column.primary_key
435            );
436
437            column = column.with_primary_key(is_primary_key);
438            tracing::trace!(
439                "DEBUG ColumnSpec after with_primary_key({}): primary_key={}",
440                is_primary_key,
441                column.primary_key
442            );
443
444            let normalized = column.name.to_ascii_lowercase();
445            if names.insert(normalized, ()).is_some() {
446                return Err(Error::InvalidArgumentError(format!(
447                    "duplicate column name '{}' in table '{}'",
448                    column.name, display_name
449                )));
450            }
451            columns.push(column);
452        }
453
454        let plan = CreateTablePlan {
455            name: display_name,
456            if_not_exists: stmt.if_not_exists,
457            or_replace: stmt.or_replace,
458            columns,
459            source: None,
460        };
461        self.execute_plan_statement(PlanStatement::CreateTable(plan))
462    }
463
464    fn try_handle_range_ctas(
465        &self,
466        display_name: &str,
467        _canonical_name: &str,
468        query: &Query,
469        if_not_exists: bool,
470        or_replace: bool,
471    ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
472        let select = match query.body.as_ref() {
473            SetExpr::Select(select) => select,
474            _ => return Ok(None),
475        };
476        if select.from.len() != 1 {
477            return Ok(None);
478        }
479        let table_with_joins = &select.from[0];
480        if !table_with_joins.joins.is_empty() {
481            return Ok(None);
482        }
483        let (range_size, range_alias) = match &table_with_joins.relation {
484            TableFactor::Table {
485                name,
486                args: Some(args),
487                alias,
488                ..
489            } => {
490                let func_name = name.to_string().to_ascii_lowercase();
491                if func_name != "range" {
492                    return Ok(None);
493                }
494                if args.args.len() != 1 {
495                    return Err(Error::InvalidArgumentError(
496                        "range table function expects a single argument".into(),
497                    ));
498                }
499                let size_expr = &args.args[0];
500                let range_size = match size_expr {
501                    FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
502                        match &value.value {
503                            Value::Number(raw, _) => raw.parse::<i64>().map_err(|e| {
504                                Error::InvalidArgumentError(format!(
505                                    "invalid range size literal {}: {}",
506                                    raw, e
507                                ))
508                            })?,
509                            other => {
510                                return Err(Error::InvalidArgumentError(format!(
511                                    "unsupported range size value: {:?}",
512                                    other
513                                )));
514                            }
515                        }
516                    }
517                    _ => {
518                        return Err(Error::InvalidArgumentError(
519                            "unsupported range argument".into(),
520                        ));
521                    }
522                };
523                (range_size, alias.as_ref().map(|a| a.name.value.clone()))
524            }
525            _ => return Ok(None),
526        };
527
528        if range_size < 0 {
529            return Err(Error::InvalidArgumentError(
530                "range size must be non-negative".into(),
531            ));
532        }
533
534        if select.projection.is_empty() {
535            return Err(Error::InvalidArgumentError(
536                "CREATE TABLE AS SELECT requires at least one projected column".into(),
537            ));
538        }
539
540        let mut column_specs = Vec::with_capacity(select.projection.len());
541        let mut column_names = Vec::with_capacity(select.projection.len());
542        let mut row_template = Vec::with_capacity(select.projection.len());
543        for item in &select.projection {
544            match item {
545                SelectItem::ExprWithAlias { expr, alias } => {
546                    let (value, data_type) = match expr {
547                        SqlExpr::Value(value_with_span) => match &value_with_span.value {
548                            Value::Number(raw, _) => {
549                                let parsed = raw.parse::<i64>().map_err(|e| {
550                                    Error::InvalidArgumentError(format!(
551                                        "invalid numeric literal {}: {}",
552                                        raw, e
553                                    ))
554                                })?;
555                                (
556                                    PlanValue::Integer(parsed),
557                                    arrow::datatypes::DataType::Int64,
558                                )
559                            }
560                            Value::SingleQuotedString(s) => (
561                                PlanValue::String(s.clone()),
562                                arrow::datatypes::DataType::Utf8,
563                            ),
564                            other => {
565                                return Err(Error::InvalidArgumentError(format!(
566                                    "unsupported SELECT expression in range CTAS: {:?}",
567                                    other
568                                )));
569                            }
570                        },
571                        SqlExpr::Identifier(ident) => {
572                            let ident_lower = ident.value.to_ascii_lowercase();
573                            if range_alias
574                                .as_ref()
575                                .map(|a| a.eq_ignore_ascii_case(&ident_lower))
576                                .unwrap_or(false)
577                                || ident_lower == "range"
578                            {
579                                return Err(Error::InvalidArgumentError(
580                                    "range() table function columns are not supported yet".into(),
581                                ));
582                            }
583                            return Err(Error::InvalidArgumentError(format!(
584                                "unsupported identifier '{}' in range CTAS projection",
585                                ident.value
586                            )));
587                        }
588                        other => {
589                            return Err(Error::InvalidArgumentError(format!(
590                                "unsupported SELECT expression in range CTAS: {:?}",
591                                other
592                            )));
593                        }
594                    };
595                    let column_name = alias.value.clone();
596                    column_specs.push(ColumnSpec::new(column_name.clone(), data_type, true));
597                    column_names.push(column_name);
598                    row_template.push(value);
599                }
600                other => {
601                    return Err(Error::InvalidArgumentError(format!(
602                        "unsupported projection {:?} in range CTAS",
603                        other
604                    )));
605                }
606            }
607        }
608
609        let plan = CreateTablePlan {
610            name: display_name.to_string(),
611            if_not_exists,
612            or_replace,
613            columns: column_specs,
614            source: None,
615        };
616        let create_result = self.execute_plan_statement(PlanStatement::CreateTable(plan))?;
617
618        let row_count = range_size
619            .try_into()
620            .map_err(|_| Error::InvalidArgumentError("range size exceeds usize".into()))?;
621        if row_count > 0 {
622            let rows = vec![row_template; row_count];
623            let insert_plan = InsertPlan {
624                table: display_name.to_string(),
625                columns: column_names,
626                source: InsertSource::Rows(rows),
627            };
628            self.execute_plan_statement(PlanStatement::Insert(insert_plan))?;
629        }
630
631        Ok(Some(create_result))
632    }
633
634    fn handle_create_table_as(
635        &self,
636        display_name: String,
637        _canonical_name: String,
638        query: Query,
639        if_not_exists: bool,
640        or_replace: bool,
641    ) -> SqlResult<RuntimeStatementResult<P>> {
642        let select_plan = self.build_select_plan(query)?;
643
644        if select_plan.projections.is_empty() && select_plan.aggregates.is_empty() {
645            return Err(Error::InvalidArgumentError(
646                "CREATE TABLE AS SELECT requires at least one projected column".into(),
647            ));
648        }
649
650        let plan = CreateTablePlan {
651            name: display_name,
652            if_not_exists,
653            or_replace,
654            columns: Vec::new(),
655            source: Some(CreateTableSource::Select {
656                plan: Box::new(select_plan),
657            }),
658        };
659        self.execute_plan_statement(PlanStatement::CreateTable(plan))
660    }
661
662    fn handle_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<RuntimeStatementResult<P>> {
663        let table_name_debug =
664            Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
665        tracing::trace!(
666            "DEBUG SQL handle_insert called for table={}",
667            table_name_debug
668        );
669        if !self.engine.session().has_active_transaction()
670            && self.is_table_marked_dropped(&table_name_debug)?
671        {
672            return Err(Error::TransactionContextError(
673                DROPPED_TABLE_TRANSACTION_ERR.into(),
674            ));
675        }
676        if stmt.replace_into || stmt.ignore || stmt.or.is_some() {
677            return Err(Error::InvalidArgumentError(
678                "non-standard INSERT forms are not supported".into(),
679            ));
680        }
681        if stmt.overwrite {
682            return Err(Error::InvalidArgumentError(
683                "INSERT OVERWRITE is not supported".into(),
684            ));
685        }
686        if !stmt.assignments.is_empty() {
687            return Err(Error::InvalidArgumentError(
688                "INSERT ... SET is not supported".into(),
689            ));
690        }
691        if stmt.partitioned.is_some() || !stmt.after_columns.is_empty() {
692            return Err(Error::InvalidArgumentError(
693                "partitioned INSERT is not supported".into(),
694            ));
695        }
696        if stmt.returning.is_some() {
697            return Err(Error::InvalidArgumentError(
698                "INSERT ... RETURNING is not supported".into(),
699            ));
700        }
701        if stmt.format_clause.is_some() || stmt.settings.is_some() {
702            return Err(Error::InvalidArgumentError(
703                "INSERT with FORMAT or SETTINGS is not supported".into(),
704            ));
705        }
706
707        let (display_name, _canonical_name) = match &stmt.table {
708            TableObject::TableName(name) => canonical_object_name(name)?,
709            _ => {
710                return Err(Error::InvalidArgumentError(
711                    "INSERT requires a plain table name".into(),
712                ));
713            }
714        };
715
716        let columns: Vec<String> = stmt
717            .columns
718            .iter()
719            .map(|ident| ident.value.clone())
720            .collect();
721        let source_expr = stmt
722            .source
723            .as_ref()
724            .ok_or_else(|| Error::InvalidArgumentError("INSERT requires a VALUES clause".into()))?;
725        validate_simple_query(source_expr)?;
726
727        let insert_source = match source_expr.body.as_ref() {
728            SetExpr::Values(values) => {
729                if values.rows.is_empty() {
730                    return Err(Error::InvalidArgumentError(
731                        "INSERT VALUES list must contain at least one row".into(),
732                    ));
733                }
734                let mut rows: Vec<Vec<SqlValue>> = Vec::with_capacity(values.rows.len());
735                for row in &values.rows {
736                    let mut converted = Vec::with_capacity(row.len());
737                    for expr in row {
738                        converted.push(SqlValue::try_from_expr(expr)?);
739                    }
740                    rows.push(converted);
741                }
742                InsertSource::Rows(
743                    rows.into_iter()
744                        .map(|row| row.into_iter().map(PlanValue::from).collect())
745                        .collect(),
746                )
747            }
748            SetExpr::Select(select) => {
749                if let Some(rows) = extract_constant_select_rows(select.as_ref())? {
750                    InsertSource::Rows(rows)
751                } else if let Some(range_rows) = extract_rows_from_range(select.as_ref())? {
752                    InsertSource::Rows(range_rows.into_rows())
753                } else {
754                    let select_plan = self.build_select_plan((**source_expr).clone())?;
755                    InsertSource::Select {
756                        plan: Box::new(select_plan),
757                    }
758                }
759            }
760            _ => {
761                return Err(Error::InvalidArgumentError(
762                    "unsupported INSERT source".into(),
763                ));
764            }
765        };
766
767        let plan = InsertPlan {
768            table: display_name.clone(),
769            columns,
770            source: insert_source,
771        };
772        tracing::trace!(
773            "DEBUG SQL handle_insert: about to execute insert for table={}",
774            display_name
775        );
776        self.execute_plan_statement(PlanStatement::Insert(plan))
777    }
778
779    fn handle_update(
780        &self,
781        table: TableWithJoins,
782        assignments: Vec<Assignment>,
783        from: Option<UpdateTableFromKind>,
784        selection: Option<SqlExpr>,
785        returning: Option<Vec<SelectItem>>,
786    ) -> SqlResult<RuntimeStatementResult<P>> {
787        if from.is_some() {
788            return Err(Error::InvalidArgumentError(
789                "UPDATE ... FROM is not supported yet".into(),
790            ));
791        }
792        if returning.is_some() {
793            return Err(Error::InvalidArgumentError(
794                "UPDATE ... RETURNING is not supported".into(),
795            ));
796        }
797        if assignments.is_empty() {
798            return Err(Error::InvalidArgumentError(
799                "UPDATE requires at least one assignment".into(),
800            ));
801        }
802
803        let (display_name, canonical_name) = extract_single_table(std::slice::from_ref(&table))?;
804
805        if !self.engine.session().has_active_transaction()
806            && self
807                .engine
808                .context()
809                .is_table_marked_dropped(&canonical_name)
810        {
811            return Err(Error::TransactionContextError(
812                DROPPED_TABLE_TRANSACTION_ERR.into(),
813            ));
814        }
815
816        let mut column_assignments = Vec::with_capacity(assignments.len());
817        let mut seen: HashMap<String, ()> = HashMap::new();
818        for assignment in assignments {
819            let column_name = resolve_assignment_column_name(&assignment.target)?;
820            let normalized = column_name.to_ascii_lowercase();
821            if seen.insert(normalized, ()).is_some() {
822                return Err(Error::InvalidArgumentError(format!(
823                    "duplicate column '{}' in UPDATE assignments",
824                    column_name
825                )));
826            }
827            let value = match SqlValue::try_from_expr(&assignment.value) {
828                Ok(literal) => AssignmentValue::Literal(PlanValue::from(literal)),
829                Err(Error::InvalidArgumentError(msg))
830                    if msg.contains("unsupported literal expression") =>
831                {
832                    let translated = translate_scalar(&assignment.value)?;
833                    AssignmentValue::Expression(translated)
834                }
835                Err(err) => return Err(err),
836            };
837            column_assignments.push(ColumnAssignment {
838                column: column_name,
839                value,
840            });
841        }
842
843        let filter = match selection {
844            Some(expr) => Some(translate_condition(&expr)?),
845            None => None,
846        };
847
848        let plan = UpdatePlan {
849            table: display_name.clone(),
850            assignments: column_assignments,
851            filter,
852        };
853        self.execute_plan_statement(PlanStatement::Update(plan))
854    }
855
856    #[allow(clippy::collapsible_if)]
857    fn handle_delete(&self, delete: Delete) -> SqlResult<RuntimeStatementResult<P>> {
858        let Delete {
859            tables,
860            from,
861            using,
862            selection,
863            returning,
864            order_by,
865            limit,
866        } = delete;
867
868        if !tables.is_empty() {
869            return Err(Error::InvalidArgumentError(
870                "multi-table DELETE is not supported yet".into(),
871            ));
872        }
873        if let Some(using_tables) = using {
874            if !using_tables.is_empty() {
875                return Err(Error::InvalidArgumentError(
876                    "DELETE ... USING is not supported yet".into(),
877                ));
878            }
879        }
880        if returning.is_some() {
881            return Err(Error::InvalidArgumentError(
882                "DELETE ... RETURNING is not supported".into(),
883            ));
884        }
885        if !order_by.is_empty() {
886            return Err(Error::InvalidArgumentError(
887                "DELETE ... ORDER BY is not supported yet".into(),
888            ));
889        }
890        if limit.is_some() {
891            return Err(Error::InvalidArgumentError(
892                "DELETE ... LIMIT is not supported yet".into(),
893            ));
894        }
895
896        let from_tables = match from {
897            FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
898        };
899        let (display_name, canonical_name) = extract_single_table(&from_tables)?;
900
901        if !self.engine.session().has_active_transaction()
902            && self
903                .engine
904                .context()
905                .is_table_marked_dropped(&canonical_name)
906        {
907            return Err(Error::TransactionContextError(
908                DROPPED_TABLE_TRANSACTION_ERR.into(),
909            ));
910        }
911
912        let filter = selection
913            .map(|expr| translate_condition(&expr))
914            .transpose()?;
915
916        let plan = DeletePlan {
917            table: display_name.clone(),
918            filter,
919        };
920        self.execute_plan_statement(PlanStatement::Delete(plan))
921    }
922
923    #[allow(clippy::too_many_arguments)] // TODO: Consider refactor
924    fn handle_drop(
925        &self,
926        object_type: ObjectType,
927        if_exists: bool,
928        names: Vec<ObjectName>,
929        cascade: bool,
930        restrict: bool,
931        purge: bool,
932        temporary: bool,
933    ) -> SqlResult<RuntimeStatementResult<P>> {
934        if cascade || restrict || purge || temporary {
935            return Err(Error::InvalidArgumentError(
936                "DROP TABLE cascade/restrict/purge/temporary options are not supported".into(),
937            ));
938        }
939
940        if object_type != ObjectType::Table {
941            return Err(Error::InvalidArgumentError(
942                "only DROP TABLE is supported".into(),
943            ));
944        }
945
946        let ctx = self.engine.context();
947        for name in names {
948            let table_name = Self::object_name_to_string(&name)?;
949            ctx.drop_table_immediate(&table_name, if_exists)
950                .map_err(|err| Self::map_table_error(&table_name, err))?;
951        }
952
953        Ok(RuntimeStatementResult::NoOp)
954    }
955
956    fn handle_query(&self, query: Query) -> SqlResult<RuntimeStatementResult<P>> {
957        let select_plan = self.build_select_plan(query)?;
958        self.execute_plan_statement(PlanStatement::Select(select_plan))
959    }
960
961    fn build_select_plan(&self, query: Query) -> SqlResult<SelectPlan> {
962        if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
963            return Err(Error::TransactionContextError(
964                "TransactionContext Error: transaction is aborted".into(),
965            ));
966        }
967
968        validate_simple_query(&query)?;
969        let mut select_plan = match query.body.as_ref() {
970            SetExpr::Select(select) => self.translate_select(select.as_ref())?,
971            other => {
972                return Err(Error::InvalidArgumentError(format!(
973                    "unsupported query expression: {other:?}"
974                )));
975            }
976        };
977        if let Some(order_by) = &query.order_by {
978            if !select_plan.aggregates.is_empty() {
979                return Err(Error::InvalidArgumentError(
980                    "ORDER BY is not supported for aggregate queries".into(),
981                ));
982            }
983            let order_plan = self.translate_order_by(order_by)?;
984            select_plan = select_plan.with_order_by(Some(order_plan));
985        }
986        Ok(select_plan)
987    }
988
989    fn translate_select(&self, select: &Select) -> SqlResult<SelectPlan> {
990        if select.distinct.is_some() {
991            return Err(Error::InvalidArgumentError(
992                "SELECT DISTINCT is not supported".into(),
993            ));
994        }
995        if select.top.is_some() {
996            return Err(Error::InvalidArgumentError(
997                "SELECT TOP is not supported".into(),
998            ));
999        }
1000        if select.exclude.is_some() {
1001            return Err(Error::InvalidArgumentError(
1002                "SELECT EXCLUDE is not supported".into(),
1003            ));
1004        }
1005        if select.into.is_some() {
1006            return Err(Error::InvalidArgumentError(
1007                "SELECT INTO is not supported".into(),
1008            ));
1009        }
1010        if !select.lateral_views.is_empty() {
1011            return Err(Error::InvalidArgumentError(
1012                "LATERAL VIEW is not supported".into(),
1013            ));
1014        }
1015        if select.prewhere.is_some() {
1016            return Err(Error::InvalidArgumentError(
1017                "PREWHERE is not supported".into(),
1018            ));
1019        }
1020        if !group_by_is_empty(&select.group_by) || select.value_table_mode.is_some() {
1021            return Err(Error::InvalidArgumentError(
1022                "GROUP BY and SELECT AS VALUE/STRUCT are not supported".into(),
1023            ));
1024        }
1025        if !select.cluster_by.is_empty()
1026            || !select.distribute_by.is_empty()
1027            || !select.sort_by.is_empty()
1028        {
1029            return Err(Error::InvalidArgumentError(
1030                "CLUSTER/DISTRIBUTE/SORT BY clauses are not supported".into(),
1031            ));
1032        }
1033        if select.having.is_some()
1034            || !select.named_window.is_empty()
1035            || select.qualify.is_some()
1036            || select.connect_by.is_some()
1037        {
1038            return Err(Error::InvalidArgumentError(
1039                "advanced SELECT clauses are not supported".into(),
1040            ));
1041        }
1042
1043        let (display_name, _canonical_name) = extract_single_table(&select.from)?;
1044        let mut plan = SelectPlan::new(display_name);
1045
1046        if let Some(aggregates) = self.detect_simple_aggregates(&select.projection)? {
1047            plan = plan.with_aggregates(aggregates);
1048        } else {
1049            let projections = self.build_projection_list(&select.projection)?;
1050            plan = plan.with_projections(projections);
1051        }
1052
1053        let filter_expr = match &select.selection {
1054            Some(expr) => Some(translate_condition(expr)?),
1055            None => None,
1056        };
1057        plan = plan.with_filter(filter_expr);
1058        Ok(plan)
1059    }
1060
1061    fn translate_order_by(&self, order_by: &OrderBy) -> SqlResult<OrderByPlan> {
1062        let exprs = match &order_by.kind {
1063            OrderByKind::Expressions(exprs) => exprs,
1064            _ => {
1065                return Err(Error::InvalidArgumentError(
1066                    "unsupported ORDER BY clause".into(),
1067                ));
1068            }
1069        };
1070
1071        if exprs.len() != 1 {
1072            return Err(Error::InvalidArgumentError(
1073                "ORDER BY currently supports a single expression".into(),
1074            ));
1075        }
1076
1077        let order_expr: &OrderByExpr = &exprs[0];
1078        let ascending = order_expr.options.asc.unwrap_or(true);
1079        let base_nulls_first = self.default_nulls_first.load(AtomicOrdering::Relaxed);
1080        let default_nulls_first_for_direction = if ascending {
1081            base_nulls_first
1082        } else {
1083            !base_nulls_first
1084        };
1085        let nulls_first = order_expr
1086            .options
1087            .nulls_first
1088            .unwrap_or(default_nulls_first_for_direction);
1089
1090        let (target, sort_type) = match &order_expr.expr {
1091            SqlExpr::Identifier(_) | SqlExpr::CompoundIdentifier(_) => (
1092                OrderTarget::Column(resolve_column_name(&order_expr.expr)?),
1093                OrderSortType::Native,
1094            ),
1095            SqlExpr::Cast {
1096                expr,
1097                data_type:
1098                    SqlDataType::Int(_)
1099                    | SqlDataType::Integer(_)
1100                    | SqlDataType::BigInt(_)
1101                    | SqlDataType::SmallInt(_)
1102                    | SqlDataType::TinyInt(_),
1103                ..
1104            } => (
1105                OrderTarget::Column(resolve_column_name(expr)?),
1106                OrderSortType::CastTextToInteger,
1107            ),
1108            SqlExpr::Cast { data_type, .. } => {
1109                return Err(Error::InvalidArgumentError(format!(
1110                    "ORDER BY CAST target type {:?} is not supported",
1111                    data_type
1112                )));
1113            }
1114            SqlExpr::Value(value_with_span) => match &value_with_span.value {
1115                Value::Number(raw, _) => {
1116                    let position: usize = raw.parse().map_err(|_| {
1117                        Error::InvalidArgumentError(format!(
1118                            "ORDER BY position '{}' is not a valid positive integer",
1119                            raw
1120                        ))
1121                    })?;
1122                    if position == 0 {
1123                        return Err(Error::InvalidArgumentError(
1124                            "ORDER BY position must be at least 1".into(),
1125                        ));
1126                    }
1127                    (OrderTarget::Index(position - 1), OrderSortType::Native)
1128                }
1129                other => {
1130                    return Err(Error::InvalidArgumentError(format!(
1131                        "unsupported ORDER BY literal expression: {other:?}"
1132                    )));
1133                }
1134            },
1135            other => {
1136                return Err(Error::InvalidArgumentError(format!(
1137                    "unsupported ORDER BY expression: {other:?}"
1138                )));
1139            }
1140        };
1141
1142        Ok(OrderByPlan {
1143            target,
1144            sort_type,
1145            ascending,
1146            nulls_first,
1147        })
1148    }
1149
1150    fn detect_simple_aggregates(
1151        &self,
1152        projection_items: &[SelectItem],
1153    ) -> SqlResult<Option<Vec<AggregateExpr>>> {
1154        if projection_items.is_empty() {
1155            return Ok(None);
1156        }
1157
1158        let mut specs: Vec<AggregateExpr> = Vec::with_capacity(projection_items.len());
1159        for (idx, item) in projection_items.iter().enumerate() {
1160            let (expr, alias_opt) = match item {
1161                SelectItem::UnnamedExpr(expr) => (expr, None),
1162                SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())),
1163                _ => return Ok(None),
1164            };
1165
1166            let alias = alias_opt.unwrap_or_else(|| format!("col{}", idx + 1));
1167            let SqlExpr::Function(func) = expr else {
1168                return Ok(None);
1169            };
1170
1171            if func.uses_odbc_syntax {
1172                return Err(Error::InvalidArgumentError(
1173                    "ODBC function syntax is not supported in aggregate queries".into(),
1174                ));
1175            }
1176            if !matches!(func.parameters, FunctionArguments::None) {
1177                return Err(Error::InvalidArgumentError(
1178                    "parameterized aggregate functions are not supported".into(),
1179                ));
1180            }
1181            if func.filter.is_some()
1182                || func.null_treatment.is_some()
1183                || func.over.is_some()
1184                || !func.within_group.is_empty()
1185            {
1186                return Err(Error::InvalidArgumentError(
1187                    "advanced aggregate clauses are not supported".into(),
1188                ));
1189            }
1190
1191            let args_slice: &[FunctionArg] = match &func.args {
1192                FunctionArguments::List(list) => {
1193                    if list.duplicate_treatment.is_some() {
1194                        return Err(Error::InvalidArgumentError(
1195                            "DISTINCT aggregates are not supported".into(),
1196                        ));
1197                    }
1198                    if !list.clauses.is_empty() {
1199                        return Err(Error::InvalidArgumentError(
1200                            "aggregate argument clauses are not supported".into(),
1201                        ));
1202                    }
1203                    &list.args
1204                }
1205                FunctionArguments::None => &[],
1206                FunctionArguments::Subquery(_) => {
1207                    return Err(Error::InvalidArgumentError(
1208                        "aggregate subquery arguments are not supported".into(),
1209                    ));
1210                }
1211            };
1212
1213            let func_name = if func.name.0.len() == 1 {
1214                match &func.name.0[0] {
1215                    ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
1216                    _ => {
1217                        return Err(Error::InvalidArgumentError(
1218                            "unsupported aggregate function name".into(),
1219                        ));
1220                    }
1221                }
1222            } else {
1223                return Err(Error::InvalidArgumentError(
1224                    "qualified aggregate function names are not supported".into(),
1225                ));
1226            };
1227
1228            let aggregate = match func_name.as_str() {
1229                "count" => {
1230                    if args_slice.len() != 1 {
1231                        return Err(Error::InvalidArgumentError(
1232                            "COUNT accepts exactly one argument".into(),
1233                        ));
1234                    }
1235                    match &args_slice[0] {
1236                        FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
1237                            AggregateExpr::count_star(alias)
1238                        }
1239                        FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
1240                            let column = resolve_column_name(arg_expr)?;
1241                            AggregateExpr::count_column(column, alias)
1242                        }
1243                        FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
1244                            return Err(Error::InvalidArgumentError(
1245                                "named COUNT arguments are not supported".into(),
1246                            ));
1247                        }
1248                        FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
1249                            return Err(Error::InvalidArgumentError(
1250                                "COUNT does not support qualified wildcards".into(),
1251                            ));
1252                        }
1253                    }
1254                }
1255                "sum" | "min" | "max" => {
1256                    if args_slice.len() != 1 {
1257                        return Err(Error::InvalidArgumentError(format!(
1258                            "{} accepts exactly one argument",
1259                            func_name.to_uppercase()
1260                        )));
1261                    }
1262                    let arg_expr = match &args_slice[0] {
1263                        FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => arg_expr,
1264                        FunctionArg::Unnamed(FunctionArgExpr::Wildcard)
1265                        | FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
1266                            return Err(Error::InvalidArgumentError(format!(
1267                                "{} does not support wildcard arguments",
1268                                func_name.to_uppercase()
1269                            )));
1270                        }
1271                        FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
1272                            return Err(Error::InvalidArgumentError(format!(
1273                                "{} arguments must be column references",
1274                                func_name.to_uppercase()
1275                            )));
1276                        }
1277                    };
1278
1279                    if func_name == "sum" {
1280                        if let Some(column) = parse_count_nulls_case(arg_expr)? {
1281                            AggregateExpr::count_nulls(column, alias)
1282                        } else {
1283                            let column = resolve_column_name(arg_expr)?;
1284                            AggregateExpr::sum_int64(column, alias)
1285                        }
1286                    } else {
1287                        let column = resolve_column_name(arg_expr)?;
1288                        if func_name == "min" {
1289                            AggregateExpr::min_int64(column, alias)
1290                        } else {
1291                            AggregateExpr::max_int64(column, alias)
1292                        }
1293                    }
1294                }
1295                _ => return Ok(None),
1296            };
1297
1298            specs.push(aggregate);
1299        }
1300
1301        if specs.is_empty() {
1302            return Ok(None);
1303        }
1304        Ok(Some(specs))
1305    }
1306
1307    fn build_projection_list(
1308        &self,
1309        projection_items: &[SelectItem],
1310    ) -> SqlResult<Vec<SelectProjection>> {
1311        if projection_items.is_empty() {
1312            return Err(Error::InvalidArgumentError(
1313                "SELECT projection must include at least one column".into(),
1314            ));
1315        }
1316        let mut projections = Vec::with_capacity(projection_items.len());
1317        for (idx, item) in projection_items.iter().enumerate() {
1318            match item {
1319                SelectItem::Wildcard(_) => {
1320                    projections.push(SelectProjection::AllColumns);
1321                }
1322                SelectItem::QualifiedWildcard(kind, _) => match kind {
1323                    SelectItemQualifiedWildcardKind::ObjectName(name) => {
1324                        projections.push(SelectProjection::Column {
1325                            name: name.to_string(),
1326                            alias: None,
1327                        });
1328                    }
1329                    SelectItemQualifiedWildcardKind::Expr(_) => {
1330                        return Err(Error::InvalidArgumentError(
1331                            "expression-qualified wildcards are not supported".into(),
1332                        ));
1333                    }
1334                },
1335                SelectItem::UnnamedExpr(expr) => {
1336                    let scalar = translate_scalar(expr)?;
1337                    let alias = format!("col{}", idx + 1);
1338                    projections.push(SelectProjection::Computed {
1339                        expr: scalar,
1340                        alias,
1341                    });
1342                }
1343                SelectItem::ExprWithAlias { expr, alias } => {
1344                    let scalar = translate_scalar(expr)?;
1345                    projections.push(SelectProjection::Computed {
1346                        expr: scalar,
1347                        alias: alias.value.clone(),
1348                    });
1349                }
1350            }
1351        }
1352        Ok(projections)
1353    }
1354
1355    #[allow(clippy::too_many_arguments)] // TODO: Refactor using struct for arg
1356    fn handle_start_transaction(
1357        &self,
1358        modes: Vec<TransactionMode>,
1359        begin: bool,
1360        transaction: Option<BeginTransactionKind>,
1361        modifier: Option<TransactionModifier>,
1362        statements: Vec<Statement>,
1363        exception: Option<Vec<ExceptionWhen>>,
1364        has_end_keyword: bool,
1365    ) -> SqlResult<RuntimeStatementResult<P>> {
1366        if !modes.is_empty() {
1367            return Err(Error::InvalidArgumentError(
1368                "transaction modes are not supported".into(),
1369            ));
1370        }
1371        if modifier.is_some() {
1372            return Err(Error::InvalidArgumentError(
1373                "transaction modifiers are not supported".into(),
1374            ));
1375        }
1376        if !statements.is_empty() || exception.is_some() || has_end_keyword {
1377            return Err(Error::InvalidArgumentError(
1378                "BEGIN blocks with inline statements or exceptions are not supported".into(),
1379            ));
1380        }
1381        if let Some(kind) = transaction {
1382            match kind {
1383                BeginTransactionKind::Transaction | BeginTransactionKind::Work => {}
1384            }
1385        }
1386        if !begin {
1387            // Currently treat START TRANSACTION same as BEGIN
1388            tracing::warn!("Currently treat `START TRANSACTION` same as `BEGIN`")
1389        }
1390
1391        self.execute_plan_statement(PlanStatement::BeginTransaction)
1392    }
1393
1394    fn handle_commit(
1395        &self,
1396        chain: bool,
1397        end: bool,
1398        modifier: Option<TransactionModifier>,
1399    ) -> SqlResult<RuntimeStatementResult<P>> {
1400        if chain {
1401            return Err(Error::InvalidArgumentError(
1402                "COMMIT AND [NO] CHAIN is not supported".into(),
1403            ));
1404        }
1405        if end {
1406            return Err(Error::InvalidArgumentError(
1407                "END blocks are not supported".into(),
1408            ));
1409        }
1410        if modifier.is_some() {
1411            return Err(Error::InvalidArgumentError(
1412                "transaction modifiers are not supported".into(),
1413            ));
1414        }
1415
1416        self.execute_plan_statement(PlanStatement::CommitTransaction)
1417    }
1418
1419    fn handle_rollback(
1420        &self,
1421        chain: bool,
1422        savepoint: Option<Ident>,
1423    ) -> SqlResult<RuntimeStatementResult<P>> {
1424        if chain {
1425            return Err(Error::InvalidArgumentError(
1426                "ROLLBACK AND [NO] CHAIN is not supported".into(),
1427            ));
1428        }
1429        if savepoint.is_some() {
1430            return Err(Error::InvalidArgumentError(
1431                "ROLLBACK TO SAVEPOINT is not supported".into(),
1432            ));
1433        }
1434
1435        self.execute_plan_statement(PlanStatement::RollbackTransaction)
1436    }
1437
1438    fn handle_set(&self, set_stmt: Set) -> SqlResult<RuntimeStatementResult<P>> {
1439        match set_stmt {
1440            Set::SingleAssignment {
1441                scope,
1442                hivevar,
1443                variable,
1444                values,
1445            } => {
1446                if scope.is_some() || hivevar {
1447                    return Err(Error::InvalidArgumentError(
1448                        "SET modifiers are not supported".into(),
1449                    ));
1450                }
1451
1452                let variable_name_raw = variable.to_string();
1453                let variable_name = variable_name_raw.to_ascii_lowercase();
1454
1455                match variable_name.as_str() {
1456                    "default_null_order" => {
1457                        if values.len() != 1 {
1458                            return Err(Error::InvalidArgumentError(
1459                                "SET default_null_order expects exactly one value".into(),
1460                            ));
1461                        }
1462
1463                        let value_expr = &values[0];
1464                        let normalized = match value_expr {
1465                            SqlExpr::Value(value_with_span) => value_with_span
1466                                .value
1467                                .clone()
1468                                .into_string()
1469                                .map(|s| s.to_ascii_lowercase()),
1470                            SqlExpr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
1471                            _ => None,
1472                        };
1473
1474                        if !matches!(normalized.as_deref(), Some("nulls_first" | "nulls_last")) {
1475                            return Err(Error::InvalidArgumentError(format!(
1476                                "unsupported value for SET default_null_order: {value_expr:?}"
1477                            )));
1478                        }
1479
1480                        let use_nulls_first = matches!(normalized.as_deref(), Some("nulls_first"));
1481                        self.default_nulls_first
1482                            .store(use_nulls_first, AtomicOrdering::Relaxed);
1483
1484                        Ok(RuntimeStatementResult::NoOp)
1485                    }
1486                    "immediate_transaction_mode" => {
1487                        if values.len() != 1 {
1488                            return Err(Error::InvalidArgumentError(
1489                                "SET immediate_transaction_mode expects exactly one value".into(),
1490                            ));
1491                        }
1492                        let normalized = values[0].to_string().to_ascii_lowercase();
1493                        let enabled = match normalized.as_str() {
1494                            "true" | "on" | "1" => true,
1495                            "false" | "off" | "0" => false,
1496                            _ => {
1497                                return Err(Error::InvalidArgumentError(format!(
1498                                    "unsupported value for SET immediate_transaction_mode: {}",
1499                                    values[0]
1500                                )));
1501                            }
1502                        };
1503                        if !enabled {
1504                            tracing::warn!(
1505                                "SET immediate_transaction_mode=false has no effect; continuing with auto mode"
1506                            );
1507                        }
1508                        Ok(RuntimeStatementResult::NoOp)
1509                    }
1510                    _ => Err(Error::InvalidArgumentError(format!(
1511                        "unsupported SET variable: {variable_name_raw}"
1512                    ))),
1513                }
1514            }
1515            other => Err(Error::InvalidArgumentError(format!(
1516                "unsupported SQL SET statement: {other:?}",
1517            ))),
1518        }
1519    }
1520
1521    fn handle_pragma(
1522        &self,
1523        name: ObjectName,
1524        value: Option<Value>,
1525        is_eq: bool,
1526    ) -> SqlResult<RuntimeStatementResult<P>> {
1527        let (display, canonical) = canonical_object_name(&name)?;
1528        if value.is_some() || is_eq {
1529            return Err(Error::InvalidArgumentError(format!(
1530                "PRAGMA '{display}' does not accept a value"
1531            )));
1532        }
1533
1534        match canonical.as_str() {
1535            "enable_verification" | "disable_verification" => Ok(RuntimeStatementResult::NoOp),
1536            _ => Err(Error::InvalidArgumentError(format!(
1537                "unsupported PRAGMA '{}'",
1538                display
1539            ))),
1540        }
1541    }
1542}
1543
1544fn canonical_object_name(name: &ObjectName) -> SqlResult<(String, String)> {
1545    if name.0.is_empty() {
1546        return Err(Error::InvalidArgumentError(
1547            "object name must not be empty".into(),
1548        ));
1549    }
1550    let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
1551    for part in &name.0 {
1552        let ident = match part {
1553            ObjectNamePart::Identifier(ident) => ident,
1554            _ => {
1555                return Err(Error::InvalidArgumentError(
1556                    "object names using functions are not supported".into(),
1557                ));
1558            }
1559        };
1560        parts.push(ident.value.clone());
1561    }
1562    let display = parts.join(".");
1563    let canonical = display.to_ascii_lowercase();
1564    Ok((display, canonical))
1565}
1566
1567fn validate_create_table_common(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
1568    if stmt.clone.is_some() || stmt.like.is_some() {
1569        return Err(Error::InvalidArgumentError(
1570            "CREATE TABLE LIKE/CLONE is not supported".into(),
1571        ));
1572    }
1573    if stmt.or_replace && stmt.if_not_exists {
1574        return Err(Error::InvalidArgumentError(
1575            "CREATE TABLE cannot combine OR REPLACE with IF NOT EXISTS".into(),
1576        ));
1577    }
1578    if !stmt.constraints.is_empty() {
1579        return Err(Error::InvalidArgumentError(
1580            "table-level constraints are not supported".into(),
1581        ));
1582    }
1583    Ok(())
1584}
1585
1586fn validate_create_table_definition(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
1587    for column in &stmt.columns {
1588        for ColumnOptionDef { option, .. } in &column.options {
1589            match option {
1590                ColumnOption::Null | ColumnOption::NotNull | ColumnOption::Unique { .. } => {}
1591                ColumnOption::Default(_) => {
1592                    return Err(Error::InvalidArgumentError(format!(
1593                        "DEFAULT values are not supported for column '{}'",
1594                        column.name
1595                    )));
1596                }
1597                other => {
1598                    return Err(Error::InvalidArgumentError(format!(
1599                        "unsupported column option {:?} on '{}'",
1600                        other, column.name
1601                    )));
1602                }
1603            }
1604        }
1605    }
1606    Ok(())
1607}
1608
1609fn validate_create_table_as(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
1610    if !stmt.columns.is_empty() {
1611        return Err(Error::InvalidArgumentError(
1612            "CREATE TABLE AS SELECT does not support column definitions yet".into(),
1613        ));
1614    }
1615    Ok(())
1616}
1617
1618fn validate_simple_query(query: &Query) -> SqlResult<()> {
1619    if query.with.is_some() {
1620        return Err(Error::InvalidArgumentError(
1621            "WITH clauses are not supported".into(),
1622        ));
1623    }
1624    if let Some(limit_clause) = &query.limit_clause {
1625        match limit_clause {
1626            LimitClause::LimitOffset {
1627                offset: Some(_), ..
1628            }
1629            | LimitClause::OffsetCommaLimit { .. } => {
1630                return Err(Error::InvalidArgumentError(
1631                    "OFFSET clauses are not supported".into(),
1632                ));
1633            }
1634            LimitClause::LimitOffset { limit_by, .. } if !limit_by.is_empty() => {
1635                return Err(Error::InvalidArgumentError(
1636                    "LIMIT BY clauses are not supported".into(),
1637                ));
1638            }
1639            _ => {}
1640        }
1641    }
1642    if query.fetch.is_some() {
1643        return Err(Error::InvalidArgumentError(
1644            "FETCH clauses are not supported".into(),
1645        ));
1646    }
1647    Ok(())
1648}
1649
1650fn resolve_column_name(expr: &SqlExpr) -> SqlResult<String> {
1651    match expr {
1652        SqlExpr::Identifier(ident) => Ok(ident.value.clone()),
1653        SqlExpr::CompoundIdentifier(parts) => {
1654            if let Some(last) = parts.last() {
1655                Ok(last.value.clone())
1656            } else {
1657                Err(Error::InvalidArgumentError(
1658                    "empty column identifier".into(),
1659                ))
1660            }
1661        }
1662        _ => Err(Error::InvalidArgumentError(
1663            "aggregate arguments must be plain column identifiers".into(),
1664        )),
1665    }
1666}
1667
1668/// Try to parse a function as an aggregate call for use in scalar expressions
1669/// Check if a scalar expression contains any aggregate functions
1670#[allow(dead_code)] // Utility function for future use
1671fn expr_contains_aggregate(expr: &llkv_expr::expr::ScalarExpr<String>) -> bool {
1672    match expr {
1673        llkv_expr::expr::ScalarExpr::Aggregate(_) => true,
1674        llkv_expr::expr::ScalarExpr::Binary { left, right, .. } => {
1675            expr_contains_aggregate(left) || expr_contains_aggregate(right)
1676        }
1677        llkv_expr::expr::ScalarExpr::Column(_) | llkv_expr::expr::ScalarExpr::Literal(_) => false,
1678    }
1679}
1680
1681fn try_parse_aggregate_function(
1682    func: &sqlparser::ast::Function,
1683) -> SqlResult<Option<llkv_expr::expr::AggregateCall<String>>> {
1684    use sqlparser::ast::{FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart};
1685
1686    if func.uses_odbc_syntax {
1687        return Ok(None);
1688    }
1689    if !matches!(func.parameters, FunctionArguments::None) {
1690        return Ok(None);
1691    }
1692    if func.filter.is_some()
1693        || func.null_treatment.is_some()
1694        || func.over.is_some()
1695        || !func.within_group.is_empty()
1696    {
1697        return Ok(None);
1698    }
1699
1700    let func_name = if func.name.0.len() == 1 {
1701        match &func.name.0[0] {
1702            ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
1703            _ => return Ok(None),
1704        }
1705    } else {
1706        return Ok(None);
1707    };
1708
1709    let args_slice: &[FunctionArg] = match &func.args {
1710        FunctionArguments::List(list) => {
1711            if list.duplicate_treatment.is_some() || !list.clauses.is_empty() {
1712                return Ok(None);
1713            }
1714            &list.args
1715        }
1716        FunctionArguments::None => &[],
1717        FunctionArguments::Subquery(_) => return Ok(None),
1718    };
1719
1720    let agg_call = match func_name.as_str() {
1721        "count" => {
1722            if args_slice.len() != 1 {
1723                return Err(Error::InvalidArgumentError(
1724                    "COUNT accepts exactly one argument".into(),
1725                ));
1726            }
1727            match &args_slice[0] {
1728                FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
1729                    llkv_expr::expr::AggregateCall::CountStar
1730                }
1731                FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
1732                    let column = resolve_column_name(arg_expr)?;
1733                    llkv_expr::expr::AggregateCall::Count(column)
1734                }
1735                _ => {
1736                    return Err(Error::InvalidArgumentError(
1737                        "unsupported COUNT argument".into(),
1738                    ));
1739                }
1740            }
1741        }
1742        "sum" => {
1743            if args_slice.len() != 1 {
1744                return Err(Error::InvalidArgumentError(
1745                    "SUM accepts exactly one argument".into(),
1746                ));
1747            }
1748            let arg_expr = match &args_slice[0] {
1749                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
1750                _ => {
1751                    return Err(Error::InvalidArgumentError(
1752                        "SUM requires a column argument".into(),
1753                    ));
1754                }
1755            };
1756
1757            // Check for COUNT(CASE ...) pattern
1758            if let Some(column) = parse_count_nulls_case(arg_expr)? {
1759                llkv_expr::expr::AggregateCall::CountNulls(column)
1760            } else {
1761                let column = resolve_column_name(arg_expr)?;
1762                llkv_expr::expr::AggregateCall::Sum(column)
1763            }
1764        }
1765        "min" => {
1766            if args_slice.len() != 1 {
1767                return Err(Error::InvalidArgumentError(
1768                    "MIN accepts exactly one argument".into(),
1769                ));
1770            }
1771            let arg_expr = match &args_slice[0] {
1772                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
1773                _ => {
1774                    return Err(Error::InvalidArgumentError(
1775                        "MIN requires a column argument".into(),
1776                    ));
1777                }
1778            };
1779            let column = resolve_column_name(arg_expr)?;
1780            llkv_expr::expr::AggregateCall::Min(column)
1781        }
1782        "max" => {
1783            if args_slice.len() != 1 {
1784                return Err(Error::InvalidArgumentError(
1785                    "MAX accepts exactly one argument".into(),
1786                ));
1787            }
1788            let arg_expr = match &args_slice[0] {
1789                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
1790                _ => {
1791                    return Err(Error::InvalidArgumentError(
1792                        "MAX requires a column argument".into(),
1793                    ));
1794                }
1795            };
1796            let column = resolve_column_name(arg_expr)?;
1797            llkv_expr::expr::AggregateCall::Max(column)
1798        }
1799        _ => return Ok(None),
1800    };
1801
1802    Ok(Some(agg_call))
1803}
1804
1805fn parse_count_nulls_case(expr: &SqlExpr) -> SqlResult<Option<String>> {
1806    let SqlExpr::Case {
1807        operand,
1808        conditions,
1809        else_result,
1810        ..
1811    } = expr
1812    else {
1813        return Ok(None);
1814    };
1815
1816    if operand.is_some() || conditions.len() != 1 {
1817        return Ok(None);
1818    }
1819
1820    let case_when = &conditions[0];
1821    if !is_integer_literal(&case_when.result, 1) {
1822        return Ok(None);
1823    }
1824
1825    let else_expr = match else_result {
1826        Some(expr) => expr.as_ref(),
1827        None => return Ok(None),
1828    };
1829    if !is_integer_literal(else_expr, 0) {
1830        return Ok(None);
1831    }
1832
1833    let inner = match &case_when.condition {
1834        SqlExpr::IsNull(inner) => inner.as_ref(),
1835        _ => return Ok(None),
1836    };
1837
1838    resolve_column_name(inner).map(Some)
1839}
1840
1841fn is_integer_literal(expr: &SqlExpr, expected: i64) -> bool {
1842    match expr {
1843        SqlExpr::Value(ValueWithSpan {
1844            value: Value::Number(text, _),
1845            ..
1846        }) => text.parse::<i64>() == Ok(expected),
1847        _ => false,
1848    }
1849}
1850
1851fn translate_condition(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
1852    match expr {
1853        SqlExpr::BinaryOp { left, op, right } => match op {
1854            BinaryOperator::And => Ok(llkv_expr::expr::Expr::And(vec![
1855                translate_condition(left)?,
1856                translate_condition(right)?,
1857            ])),
1858            BinaryOperator::Or => Ok(llkv_expr::expr::Expr::Or(vec![
1859                translate_condition(left)?,
1860                translate_condition(right)?,
1861            ])),
1862            BinaryOperator::Eq
1863            | BinaryOperator::NotEq
1864            | BinaryOperator::Lt
1865            | BinaryOperator::LtEq
1866            | BinaryOperator::Gt
1867            | BinaryOperator::GtEq => translate_comparison(left, op.clone(), right),
1868            other => Err(Error::InvalidArgumentError(format!(
1869                "unsupported binary operator in WHERE clause: {other:?}"
1870            ))),
1871        },
1872        SqlExpr::UnaryOp {
1873            op: UnaryOperator::Not,
1874            expr,
1875        } => Ok(llkv_expr::expr::Expr::not(translate_condition(expr)?)),
1876        SqlExpr::Nested(inner) => translate_condition(inner),
1877        other => Err(Error::InvalidArgumentError(format!(
1878            "unsupported WHERE clause: {other:?}"
1879        ))),
1880    }
1881}
1882
1883fn translate_comparison(
1884    left: &SqlExpr,
1885    op: BinaryOperator,
1886    right: &SqlExpr,
1887) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
1888    let left_scalar = translate_scalar(left)?;
1889    let right_scalar = translate_scalar(right)?;
1890    let compare_op = match op {
1891        BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
1892        BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
1893        BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
1894        BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
1895        BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
1896        BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
1897        other => {
1898            return Err(Error::InvalidArgumentError(format!(
1899                "unsupported comparison operator: {other:?}"
1900            )));
1901        }
1902    };
1903
1904    if let (
1905        llkv_expr::expr::ScalarExpr::Column(column),
1906        llkv_expr::expr::ScalarExpr::Literal(literal),
1907    ) = (&left_scalar, &right_scalar)
1908        && let Some(op) = compare_op_to_filter_operator(compare_op, literal)
1909    {
1910        return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
1911            field_id: column.clone(),
1912            op,
1913        }));
1914    }
1915
1916    if let (
1917        llkv_expr::expr::ScalarExpr::Literal(literal),
1918        llkv_expr::expr::ScalarExpr::Column(column),
1919    ) = (&left_scalar, &right_scalar)
1920        && let Some(flipped) = flip_compare_op(compare_op)
1921        && let Some(op) = compare_op_to_filter_operator(flipped, literal)
1922    {
1923        return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
1924            field_id: column.clone(),
1925            op,
1926        }));
1927    }
1928
1929    Ok(llkv_expr::expr::Expr::Compare {
1930        left: left_scalar,
1931        op: compare_op,
1932        right: right_scalar,
1933    })
1934}
1935
1936fn compare_op_to_filter_operator(
1937    op: llkv_expr::expr::CompareOp,
1938    literal: &Literal,
1939) -> Option<llkv_expr::expr::Operator<'static>> {
1940    let lit = literal.clone();
1941    match op {
1942        llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::Operator::Equals(lit)),
1943        llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::Operator::LessThan(lit)),
1944        llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::Operator::LessThanOrEquals(lit)),
1945        llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::Operator::GreaterThan(lit)),
1946        llkv_expr::expr::CompareOp::GtEq => {
1947            Some(llkv_expr::expr::Operator::GreaterThanOrEquals(lit))
1948        }
1949        llkv_expr::expr::CompareOp::NotEq => None,
1950    }
1951}
1952
1953fn flip_compare_op(op: llkv_expr::expr::CompareOp) -> Option<llkv_expr::expr::CompareOp> {
1954    match op {
1955        llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::CompareOp::Eq),
1956        llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::CompareOp::Gt),
1957        llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::CompareOp::GtEq),
1958        llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::CompareOp::Lt),
1959        llkv_expr::expr::CompareOp::GtEq => Some(llkv_expr::expr::CompareOp::LtEq),
1960        llkv_expr::expr::CompareOp::NotEq => None,
1961    }
1962}
1963
1964fn translate_scalar(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
1965    match expr {
1966        SqlExpr::Identifier(ident) => Ok(llkv_expr::expr::ScalarExpr::column(ident.value.clone())),
1967        SqlExpr::CompoundIdentifier(idents) => {
1968            if let Some(last) = idents.last() {
1969                translate_scalar(&SqlExpr::Identifier(last.clone()))
1970            } else {
1971                Err(Error::InvalidArgumentError(
1972                    "invalid compound identifier".into(),
1973                ))
1974            }
1975        }
1976        SqlExpr::Value(value) => literal_from_value(value),
1977        SqlExpr::BinaryOp { left, op, right } => {
1978            let left_expr = translate_scalar(left)?;
1979            let right_expr = translate_scalar(right)?;
1980            let op = match op {
1981                BinaryOperator::Plus => llkv_expr::expr::BinaryOp::Add,
1982                BinaryOperator::Minus => llkv_expr::expr::BinaryOp::Subtract,
1983                BinaryOperator::Multiply => llkv_expr::expr::BinaryOp::Multiply,
1984                BinaryOperator::Divide => llkv_expr::expr::BinaryOp::Divide,
1985                BinaryOperator::Modulo => llkv_expr::expr::BinaryOp::Modulo,
1986                other => {
1987                    return Err(Error::InvalidArgumentError(format!(
1988                        "unsupported scalar binary operator: {other:?}"
1989                    )));
1990                }
1991            };
1992            Ok(llkv_expr::expr::ScalarExpr::binary(
1993                left_expr, op, right_expr,
1994            ))
1995        }
1996        SqlExpr::UnaryOp {
1997            op: UnaryOperator::Minus,
1998            expr,
1999        } => match translate_scalar(expr)? {
2000            llkv_expr::expr::ScalarExpr::Literal(lit) => match lit {
2001                Literal::Integer(v) => {
2002                    Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(-v)))
2003                }
2004                Literal::Float(v) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(-v))),
2005                Literal::String(_) => Err(Error::InvalidArgumentError(
2006                    "cannot negate string literal".into(),
2007                )),
2008            },
2009            _ => Err(Error::InvalidArgumentError(
2010                "cannot negate non-literal expression".into(),
2011            )),
2012        },
2013        SqlExpr::UnaryOp {
2014            op: UnaryOperator::Plus,
2015            expr,
2016        } => translate_scalar(expr),
2017        SqlExpr::Nested(inner) => translate_scalar(inner),
2018        SqlExpr::Function(func) => {
2019            // Try to parse as an aggregate function
2020            if let Some(agg_call) = try_parse_aggregate_function(func)? {
2021                Ok(llkv_expr::expr::ScalarExpr::aggregate(agg_call))
2022            } else {
2023                Err(Error::InvalidArgumentError(format!(
2024                    "unsupported function in scalar expression: {:?}",
2025                    func.name
2026                )))
2027            }
2028        }
2029        other => Err(Error::InvalidArgumentError(format!(
2030            "unsupported scalar expression: {other:?}"
2031        ))),
2032    }
2033}
2034
2035fn literal_from_value(value: &ValueWithSpan) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
2036    match &value.value {
2037        Value::Number(text, _) => {
2038            if text.contains(['.', 'e', 'E']) {
2039                let parsed = text.parse::<f64>().map_err(|err| {
2040                    Error::InvalidArgumentError(format!("invalid float literal: {err}"))
2041                })?;
2042                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(parsed)))
2043            } else {
2044                let parsed = text.parse::<i128>().map_err(|err| {
2045                    Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
2046                })?;
2047                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(
2048                    parsed,
2049                )))
2050            }
2051        }
2052        Value::Boolean(_) => Err(Error::InvalidArgumentError(
2053            "BOOLEAN literals are not supported yet".into(),
2054        )),
2055        Value::Null => Err(Error::InvalidArgumentError(
2056            "NULL literal is not supported in comparisons; use IS NULL".into(),
2057        )),
2058        other => {
2059            if let Some(text) = other.clone().into_string() {
2060                Ok(llkv_expr::expr::ScalarExpr::literal(Literal::String(text)))
2061            } else {
2062                Err(Error::InvalidArgumentError(format!(
2063                    "unsupported literal: {other:?}"
2064                )))
2065            }
2066        }
2067    }
2068}
2069
2070fn resolve_assignment_column_name(target: &AssignmentTarget) -> SqlResult<String> {
2071    match target {
2072        AssignmentTarget::ColumnName(name) => {
2073            if name.0.len() != 1 {
2074                return Err(Error::InvalidArgumentError(
2075                    "qualified column names in UPDATE assignments are not supported yet".into(),
2076                ));
2077            }
2078            match &name.0[0] {
2079                ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
2080                other => Err(Error::InvalidArgumentError(format!(
2081                    "unsupported column reference in UPDATE assignment: {other:?}"
2082                ))),
2083            }
2084        }
2085        AssignmentTarget::Tuple(_) => Err(Error::InvalidArgumentError(
2086            "tuple assignments are not supported yet".into(),
2087        )),
2088    }
2089}
2090
2091fn arrow_type_from_sql(data_type: &SqlDataType) -> SqlResult<arrow::datatypes::DataType> {
2092    use arrow::datatypes::DataType;
2093    match data_type {
2094        SqlDataType::Int(_)
2095        | SqlDataType::Integer(_)
2096        | SqlDataType::BigInt(_)
2097        | SqlDataType::SmallInt(_)
2098        | SqlDataType::TinyInt(_) => Ok(DataType::Int64),
2099        SqlDataType::Float(_)
2100        | SqlDataType::Real
2101        | SqlDataType::Double(_)
2102        | SqlDataType::DoublePrecision => Ok(DataType::Float64),
2103        SqlDataType::Text
2104        | SqlDataType::String(_)
2105        | SqlDataType::Varchar(_)
2106        | SqlDataType::Char(_)
2107        | SqlDataType::Uuid => Ok(DataType::Utf8),
2108        SqlDataType::Date => Ok(DataType::Date32),
2109        SqlDataType::Decimal(_) | SqlDataType::Numeric(_) => Ok(DataType::Float64),
2110        SqlDataType::Boolean => Err(Error::InvalidArgumentError(
2111            "BOOLEAN columns are not supported yet".into(),
2112        )),
2113        other => Err(Error::InvalidArgumentError(format!(
2114            "unsupported SQL data type: {other:?}"
2115        ))),
2116    }
2117}
2118
2119fn extract_constant_select_rows(select: &Select) -> SqlResult<Option<Vec<Vec<PlanValue>>>> {
2120    if !select.from.is_empty() {
2121        return Ok(None);
2122    }
2123
2124    if select.selection.is_some()
2125        || select.having.is_some()
2126        || !select.named_window.is_empty()
2127        || select.qualify.is_some()
2128        || select.distinct.is_some()
2129        || select.top.is_some()
2130        || select.into.is_some()
2131        || select.prewhere.is_some()
2132        || !select.lateral_views.is_empty()
2133        || select.value_table_mode.is_some()
2134        || !group_by_is_empty(&select.group_by)
2135    {
2136        return Err(Error::InvalidArgumentError(
2137            "constant SELECT statements do not support advanced clauses".into(),
2138        ));
2139    }
2140
2141    if select.projection.is_empty() {
2142        return Err(Error::InvalidArgumentError(
2143            "constant SELECT requires at least one projection".into(),
2144        ));
2145    }
2146
2147    let mut row: Vec<PlanValue> = Vec::with_capacity(select.projection.len());
2148    for item in &select.projection {
2149        let expr = match item {
2150            SelectItem::UnnamedExpr(expr) => expr,
2151            SelectItem::ExprWithAlias { expr, .. } => expr,
2152            other => {
2153                return Err(Error::InvalidArgumentError(format!(
2154                    "unsupported projection in constant SELECT: {other:?}"
2155                )));
2156            }
2157        };
2158
2159        let value = SqlValue::try_from_expr(expr)?;
2160        row.push(PlanValue::from(value));
2161    }
2162
2163    Ok(Some(vec![row]))
2164}
2165
2166fn extract_single_table(from: &[TableWithJoins]) -> SqlResult<(String, String)> {
2167    if from.len() != 1 {
2168        return Err(Error::InvalidArgumentError(
2169            "queries over multiple tables are not supported yet".into(),
2170        ));
2171    }
2172    let item = &from[0];
2173    if !item.joins.is_empty() {
2174        return Err(Error::InvalidArgumentError(
2175            "JOIN clauses are not supported yet".into(),
2176        ));
2177    }
2178    match &item.relation {
2179        TableFactor::Table { name, .. } => canonical_object_name(name),
2180        _ => Err(Error::InvalidArgumentError(
2181            "queries require a plain table name".into(),
2182        )),
2183    }
2184}
2185
2186fn group_by_is_empty(expr: &GroupByExpr) -> bool {
2187    matches!(
2188        expr,
2189        GroupByExpr::Expressions(exprs, modifiers)
2190            if exprs.is_empty() && modifiers.is_empty()
2191    )
2192}
2193
2194#[cfg(test)]
2195mod tests {
2196    use super::*;
2197    use arrow::array::{Array, Int64Array, StringArray};
2198    use arrow::record_batch::RecordBatch;
2199    use llkv_storage::pager::MemPager;
2200
2201    fn extract_string_options(batches: &[RecordBatch]) -> Vec<Option<String>> {
2202        let mut values: Vec<Option<String>> = Vec::new();
2203        for batch in batches {
2204            let column = batch
2205                .column(0)
2206                .as_any()
2207                .downcast_ref::<StringArray>()
2208                .expect("string column");
2209            for idx in 0..column.len() {
2210                if column.is_null(idx) {
2211                    values.push(None);
2212                } else {
2213                    values.push(Some(column.value(idx).to_string()));
2214                }
2215            }
2216        }
2217        values
2218    }
2219
2220    #[test]
2221    fn create_insert_select_roundtrip() {
2222        let pager = Arc::new(MemPager::default());
2223        let engine = SqlEngine::new(pager);
2224
2225        let result = engine
2226            .execute("CREATE TABLE people (id INT NOT NULL, name TEXT NOT NULL)")
2227            .expect("create table");
2228        assert!(matches!(
2229            result[0],
2230            RuntimeStatementResult::CreateTable { .. }
2231        ));
2232
2233        let result = engine
2234            .execute("INSERT INTO people (id, name) VALUES (1, 'alice'), (2, 'bob')")
2235            .expect("insert rows");
2236        assert!(matches!(
2237            result[0],
2238            RuntimeStatementResult::Insert {
2239                rows_inserted: 2,
2240                ..
2241            }
2242        ));
2243
2244        let mut result = engine
2245            .execute("SELECT name FROM people WHERE id = 2")
2246            .expect("select rows");
2247        let select_result = result.remove(0);
2248        let batches = match select_result {
2249            RuntimeStatementResult::Select { execution, .. } => {
2250                execution.collect().expect("collect batches")
2251            }
2252            _ => panic!("expected select result"),
2253        };
2254        assert_eq!(batches.len(), 1);
2255        let column = batches[0]
2256            .column(0)
2257            .as_any()
2258            .downcast_ref::<StringArray>()
2259            .expect("string column");
2260        assert_eq!(column.len(), 1);
2261        assert_eq!(column.value(0), "bob");
2262    }
2263
2264    #[test]
2265    fn insert_select_constant_including_null() {
2266        let pager = Arc::new(MemPager::default());
2267        let engine = SqlEngine::new(pager);
2268
2269        engine
2270            .execute("CREATE TABLE integers(i INTEGER)")
2271            .expect("create table");
2272
2273        let result = engine
2274            .execute("INSERT INTO integers SELECT 42")
2275            .expect("insert literal");
2276        assert!(matches!(
2277            result[0],
2278            RuntimeStatementResult::Insert {
2279                rows_inserted: 1,
2280                ..
2281            }
2282        ));
2283
2284        let result = engine
2285            .execute("INSERT INTO integers SELECT CAST(NULL AS VARCHAR)")
2286            .expect("insert null literal");
2287        assert!(matches!(
2288            result[0],
2289            RuntimeStatementResult::Insert {
2290                rows_inserted: 1,
2291                ..
2292            }
2293        ));
2294
2295        let mut result = engine
2296            .execute("SELECT * FROM integers")
2297            .expect("select rows");
2298        let select_result = result.remove(0);
2299        let batches = match select_result {
2300            RuntimeStatementResult::Select { execution, .. } => {
2301                execution.collect().expect("collect batches")
2302            }
2303            _ => panic!("expected select result"),
2304        };
2305
2306        let mut values: Vec<Option<i64>> = Vec::new();
2307        for batch in &batches {
2308            let column = batch
2309                .column(0)
2310                .as_any()
2311                .downcast_ref::<Int64Array>()
2312                .expect("int column");
2313            for idx in 0..column.len() {
2314                if column.is_null(idx) {
2315                    values.push(None);
2316                } else {
2317                    values.push(Some(column.value(idx)));
2318                }
2319            }
2320        }
2321
2322        assert_eq!(values, vec![Some(42), None]);
2323    }
2324
2325    #[test]
2326    fn update_with_where_clause_filters_rows() {
2327        let pager = Arc::new(MemPager::default());
2328        let engine = SqlEngine::new(pager);
2329
2330        engine
2331            .execute("SET default_null_order='nulls_first'")
2332            .expect("set default null order");
2333
2334        engine
2335            .execute("CREATE TABLE strings(a VARCHAR)")
2336            .expect("create table");
2337
2338        engine
2339            .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
2340            .expect("insert seed rows");
2341
2342        let result = engine
2343            .execute("UPDATE strings SET a = 13 WHERE a = '3'")
2344            .expect("update rows");
2345        assert!(matches!(
2346            result[0],
2347            RuntimeStatementResult::Update {
2348                rows_updated: 1,
2349                ..
2350            }
2351        ));
2352
2353        let mut result = engine
2354            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
2355            .expect("select rows");
2356        let select_result = result.remove(0);
2357        let batches = match select_result {
2358            RuntimeStatementResult::Select { execution, .. } => {
2359                execution.collect().expect("collect batches")
2360            }
2361            _ => panic!("expected select result"),
2362        };
2363
2364        let mut values: Vec<Option<String>> = Vec::new();
2365        for batch in &batches {
2366            let column = batch
2367                .column(0)
2368                .as_any()
2369                .downcast_ref::<StringArray>()
2370                .expect("string column");
2371            for idx in 0..column.len() {
2372                if column.is_null(idx) {
2373                    values.push(None);
2374                } else {
2375                    values.push(Some(column.value(idx).to_string()));
2376                }
2377            }
2378        }
2379
2380        values.sort_by(|a, b| match (a, b) {
2381            (None, None) => std::cmp::Ordering::Equal,
2382            (None, Some(_)) => std::cmp::Ordering::Less,
2383            (Some(_), None) => std::cmp::Ordering::Greater,
2384            (Some(av), Some(bv)) => {
2385                let a_val = av.parse::<i64>().unwrap_or_default();
2386                let b_val = bv.parse::<i64>().unwrap_or_default();
2387                a_val.cmp(&b_val)
2388            }
2389        });
2390
2391        assert_eq!(
2392            values,
2393            vec![None, Some("4".to_string()), Some("13".to_string())]
2394        );
2395    }
2396
2397    #[test]
2398    fn order_by_honors_configured_default_null_order() {
2399        let pager = Arc::new(MemPager::default());
2400        let engine = SqlEngine::new(pager);
2401
2402        engine
2403            .execute("CREATE TABLE strings(a VARCHAR)")
2404            .expect("create table");
2405        engine
2406            .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
2407            .expect("insert values");
2408        engine
2409            .execute("UPDATE strings SET a = 13 WHERE a = '3'")
2410            .expect("update value");
2411
2412        let mut result = engine
2413            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
2414            .expect("select rows");
2415        let select_result = result.remove(0);
2416        let batches = match select_result {
2417            RuntimeStatementResult::Select { execution, .. } => {
2418                execution.collect().expect("collect batches")
2419            }
2420            _ => panic!("expected select result"),
2421        };
2422
2423        let values = extract_string_options(&batches);
2424        assert_eq!(
2425            values,
2426            vec![Some("4".to_string()), Some("13".to_string()), None]
2427        );
2428
2429        assert!(!engine.default_nulls_first_for_tests());
2430
2431        engine
2432            .execute("SET default_null_order='nulls_first'")
2433            .expect("set default null order");
2434
2435        assert!(engine.default_nulls_first_for_tests());
2436
2437        let mut result = engine
2438            .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
2439            .expect("select rows");
2440        let select_result = result.remove(0);
2441        let batches = match select_result {
2442            RuntimeStatementResult::Select { execution, .. } => {
2443                execution.collect().expect("collect batches")
2444            }
2445            _ => panic!("expected select result"),
2446        };
2447
2448        let values = extract_string_options(&batches);
2449        assert_eq!(
2450            values,
2451            vec![None, Some("4".to_string()), Some("13".to_string())]
2452        );
2453    }
2454}