1use std::collections::{HashMap, HashSet};
2use std::sync::{
3 Arc, OnceLock,
4 atomic::{AtomicBool, Ordering as AtomicOrdering},
5};
6
7use crate::SqlResult;
8use crate::SqlValue;
9use arrow::record_batch::RecordBatch;
10
11use llkv_executor::SelectExecution;
12use llkv_expr::literal::Literal;
13use llkv_plan::validation::{
14 ensure_known_columns_case_insensitive, ensure_non_empty, ensure_unique_case_insensitive,
15};
16use llkv_result::Error;
17use llkv_runtime::storage_namespace::TEMPORARY_NAMESPACE_ID;
18use llkv_runtime::{
19 AggregateExpr, AssignmentValue, ColumnAssignment, ColumnSpec, CreateIndexPlan, CreateTablePlan,
20 CreateTableSource, DeletePlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertPlan,
21 InsertSource, OrderByPlan, OrderSortType, OrderTarget, PlanStatement, PlanValue,
22 RuntimeContext, RuntimeEngine, RuntimeSession, RuntimeStatementResult, SelectPlan,
23 SelectProjection, UpdatePlan, extract_rows_from_range,
24};
25use llkv_storage::pager::Pager;
26use llkv_table::catalog::{IdentifierContext, IdentifierResolver};
27use regex::Regex;
28use simd_r_drive_entry_handle::EntryHandle;
29use sqlparser::ast::{
30 Assignment, AssignmentTarget, BeginTransactionKind, BinaryOperator, ColumnOption,
31 ColumnOptionDef, ConstraintCharacteristics, DataType as SqlDataType, Delete, ExceptionWhen,
32 Expr as SqlExpr, FromTable, FunctionArg, FunctionArgExpr, FunctionArguments, GroupByExpr,
33 Ident, LimitClause, NullsDistinctOption, ObjectName, ObjectNamePart, ObjectType, OrderBy,
34 OrderByKind, Query, ReferentialAction, SchemaName, Select, SelectItem,
35 SelectItemQualifiedWildcardKind, Set, SetExpr, SqlOption, Statement, TableConstraint,
36 TableFactor, TableObject, TableWithJoins, TransactionMode, TransactionModifier, UnaryOperator,
37 UpdateTableFromKind, Value, ValueWithSpan,
38};
39use sqlparser::dialect::GenericDialect;
40use sqlparser::parser::Parser;
41
42pub struct SqlEngine<P>
88where
89 P: Pager<Blob = EntryHandle> + Send + Sync,
90{
91 engine: RuntimeEngine<P>,
92 default_nulls_first: AtomicBool,
93}
94
95const DROPPED_TABLE_TRANSACTION_ERR: &str = "another transaction has dropped this table";
96
97impl<P> Clone for SqlEngine<P>
98where
99 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
100{
101 fn clone(&self) -> Self {
102 tracing::warn!(
103 "[SQL_ENGINE] SqlEngine::clone() called - will create new Engine with new session!"
104 );
105 Self {
107 engine: self.engine.clone(),
108 default_nulls_first: AtomicBool::new(
109 self.default_nulls_first.load(AtomicOrdering::Relaxed),
110 ),
111 }
112 }
113}
114
115#[allow(dead_code)]
116impl<P> SqlEngine<P>
117where
118 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
119{
120 fn map_table_error(table_name: &str, err: Error) -> Error {
121 match err {
122 Error::NotFound => Self::table_not_found_error(table_name),
123 Error::InvalidArgumentError(msg) if msg.contains("unknown table") => {
124 Self::table_not_found_error(table_name)
125 }
126 other => other,
127 }
128 }
129
130 fn table_not_found_error(table_name: &str) -> Error {
131 Error::CatalogError(format!(
132 "Catalog Error: Table '{table_name}' does not exist"
133 ))
134 }
135
136 fn is_table_missing_error(err: &Error) -> bool {
137 match err {
138 Error::NotFound => true,
139 Error::CatalogError(msg) => {
140 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
141 }
142 Error::InvalidArgumentError(msg) => {
143 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
144 }
145 _ => false,
146 }
147 }
148
149 fn execute_plan_statement(
150 &self,
151 statement: PlanStatement,
152 ) -> SqlResult<RuntimeStatementResult<P>> {
153 let table = llkv_runtime::statement_table_name(&statement).map(str::to_string);
154 self.engine.execute_statement(statement).map_err(|err| {
155 if let Some(table_name) = table {
156 Self::map_table_error(&table_name, err)
157 } else {
158 err
159 }
160 })
161 }
162
163 pub fn new(pager: Arc<P>) -> Self {
164 let engine = RuntimeEngine::new(pager);
165 Self {
166 engine,
167 default_nulls_first: AtomicBool::new(false),
168 }
169 }
170
171 fn preprocess_exclude_syntax(sql: &str) -> String {
174 static EXCLUDE_REGEX: OnceLock<Regex> = OnceLock::new();
175
176 let re = EXCLUDE_REGEX.get_or_init(|| {
179 Regex::new(
180 r"(?i)EXCLUDE\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)+)\s*\)",
181 )
182 .expect("valid EXCLUDE qualifier regex")
183 });
184
185 re.replace_all(sql, |caps: ®ex::Captures| {
186 let qualified_name = &caps[1];
187 format!("EXCLUDE (\"{}\")", qualified_name)
188 })
189 .to_string()
190 }
191
192 pub(crate) fn context_arc(&self) -> Arc<RuntimeContext<P>> {
193 self.engine.context()
194 }
195
196 pub fn with_context(context: Arc<RuntimeContext<P>>, default_nulls_first: bool) -> Self {
197 Self {
198 engine: RuntimeEngine::from_context(context),
199 default_nulls_first: AtomicBool::new(default_nulls_first),
200 }
201 }
202
203 #[cfg(test)]
204 fn default_nulls_first_for_tests(&self) -> bool {
205 self.default_nulls_first.load(AtomicOrdering::Relaxed)
206 }
207
208 fn has_active_transaction(&self) -> bool {
209 self.engine.session().has_active_transaction()
210 }
211
212 pub fn session(&self) -> &RuntimeSession<P> {
214 self.engine.session()
215 }
216
217 pub fn execute(&self, sql: &str) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
230 tracing::trace!("DEBUG SQL execute: {}", sql);
231
232 let processed_sql = Self::preprocess_exclude_syntax(sql);
235
236 let dialect = GenericDialect {};
237 let statements = Parser::parse_sql(&dialect, &processed_sql)
238 .map_err(|err| Error::InvalidArgumentError(format!("failed to parse SQL: {err}")))?;
239 tracing::trace!("DEBUG SQL execute: parsed {} statements", statements.len());
240
241 let mut results = Vec::with_capacity(statements.len());
242 for (i, statement) in statements.iter().enumerate() {
243 tracing::trace!("DEBUG SQL execute: processing statement {}", i);
244 results.push(self.execute_statement(statement.clone())?);
245 tracing::trace!("DEBUG SQL execute: statement {} completed", i);
246 }
247 tracing::trace!("DEBUG SQL execute completed successfully");
248 Ok(results)
249 }
250
251 pub fn sql(&self, sql: &str) -> SqlResult<Vec<RecordBatch>> {
288 let mut results = self.execute(sql)?;
289 if results.len() != 1 {
290 return Err(Error::InvalidArgumentError(
291 "SqlEngine::sql expects exactly one SQL statement".into(),
292 ));
293 }
294
295 match results.pop().expect("checked length above") {
296 RuntimeStatementResult::Select { execution, .. } => execution.collect(),
297 other => Err(Error::InvalidArgumentError(format!(
298 "SqlEngine::sql requires a SELECT statement, got {other:?}",
299 ))),
300 }
301 }
302
303 fn execute_statement(&self, statement: Statement) -> SqlResult<RuntimeStatementResult<P>> {
304 tracing::trace!(
305 "DEBUG SQL execute_statement: {:?}",
306 match &statement {
307 Statement::Insert(insert) =>
308 format!("Insert(table={:?})", Self::table_name_from_insert(insert)),
309 Statement::Query(_) => "Query".to_string(),
310 Statement::StartTransaction { .. } => "StartTransaction".to_string(),
311 Statement::Commit { .. } => "Commit".to_string(),
312 Statement::Rollback { .. } => "Rollback".to_string(),
313 Statement::CreateTable(_) => "CreateTable".to_string(),
314 Statement::Update { .. } => "Update".to_string(),
315 Statement::Delete(_) => "Delete".to_string(),
316 other => format!("Other({:?})", other),
317 }
318 );
319 match statement {
320 Statement::StartTransaction {
321 modes,
322 begin,
323 transaction,
324 modifier,
325 statements,
326 exception,
327 has_end_keyword,
328 } => self.handle_start_transaction(
329 modes,
330 begin,
331 transaction,
332 modifier,
333 statements,
334 exception,
335 has_end_keyword,
336 ),
337 Statement::Commit {
338 chain,
339 end,
340 modifier,
341 } => self.handle_commit(chain, end, modifier),
342 Statement::Rollback { chain, savepoint } => self.handle_rollback(chain, savepoint),
343 other => self.execute_statement_non_transactional(other),
344 }
345 }
346
347 fn execute_statement_non_transactional(
348 &self,
349 statement: Statement,
350 ) -> SqlResult<RuntimeStatementResult<P>> {
351 tracing::trace!("DEBUG SQL execute_statement_non_transactional called");
352 match statement {
353 Statement::CreateTable(stmt) => {
354 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateTable");
355 self.handle_create_table(stmt)
356 }
357 Statement::CreateIndex(stmt) => {
358 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateIndex");
359 self.handle_create_index(stmt)
360 }
361 Statement::CreateSchema {
362 schema_name,
363 if_not_exists,
364 with,
365 options,
366 default_collate_spec,
367 clone,
368 } => {
369 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateSchema");
370 self.handle_create_schema(
371 schema_name,
372 if_not_exists,
373 with,
374 options,
375 default_collate_spec,
376 clone,
377 )
378 }
379 Statement::Insert(stmt) => {
380 let table_name =
381 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
382 tracing::trace!(
383 "DEBUG SQL execute_statement_non_transactional: Insert(table={})",
384 table_name
385 );
386 self.handle_insert(stmt)
387 }
388 Statement::Query(query) => {
389 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Query");
390 self.handle_query(*query)
391 }
392 Statement::Update {
393 table,
394 assignments,
395 from,
396 selection,
397 returning,
398 ..
399 } => {
400 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Update");
401 self.handle_update(table, assignments, from, selection, returning)
402 }
403 Statement::Delete(delete) => {
404 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Delete");
405 self.handle_delete(delete)
406 }
407 Statement::Drop {
408 object_type,
409 if_exists,
410 names,
411 cascade,
412 restrict,
413 purge,
414 temporary,
415 ..
416 } => {
417 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Drop");
418 self.handle_drop(
419 object_type,
420 if_exists,
421 names,
422 cascade,
423 restrict,
424 purge,
425 temporary,
426 )
427 }
428 Statement::Set(set_stmt) => {
429 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Set");
430 self.handle_set(set_stmt)
431 }
432 Statement::Pragma { name, value, is_eq } => {
433 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Pragma");
434 self.handle_pragma(name, value, is_eq)
435 }
436 other => {
437 tracing::trace!(
438 "DEBUG SQL execute_statement_non_transactional: Other({:?})",
439 other
440 );
441 Err(Error::InvalidArgumentError(format!(
442 "unsupported SQL statement: {other:?}"
443 )))
444 }
445 }
446 }
447
448 fn table_name_from_insert(insert: &sqlparser::ast::Insert) -> SqlResult<String> {
449 match &insert.table {
450 TableObject::TableName(name) => Self::object_name_to_string(name),
451 _ => Err(Error::InvalidArgumentError(
452 "INSERT requires a plain table name".into(),
453 )),
454 }
455 }
456
457 fn table_name_from_update(table: &TableWithJoins) -> SqlResult<Option<String>> {
458 if !table.joins.is_empty() {
459 return Err(Error::InvalidArgumentError(
460 "UPDATE with JOIN targets is not supported yet".into(),
461 ));
462 }
463 Self::table_with_joins_name(table)
464 }
465
466 fn table_name_from_delete(delete: &Delete) -> SqlResult<Option<String>> {
467 if !delete.tables.is_empty() {
468 return Err(Error::InvalidArgumentError(
469 "multi-table DELETE is not supported yet".into(),
470 ));
471 }
472 let from_tables = match &delete.from {
473 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
474 };
475 if from_tables.is_empty() {
476 return Ok(None);
477 }
478 if from_tables.len() != 1 {
479 return Err(Error::InvalidArgumentError(
480 "DELETE over multiple tables is not supported yet".into(),
481 ));
482 }
483 Self::table_with_joins_name(&from_tables[0])
484 }
485
486 fn object_name_to_string(name: &ObjectName) -> SqlResult<String> {
487 let (display, _) = canonical_object_name(name)?;
488 Ok(display)
489 }
490
491 #[allow(dead_code)]
492 fn table_object_to_name(table: &TableObject) -> SqlResult<Option<String>> {
493 match table {
494 TableObject::TableName(name) => Ok(Some(Self::object_name_to_string(name)?)),
495 TableObject::TableFunction(_) => Ok(None),
496 }
497 }
498
499 fn table_with_joins_name(table: &TableWithJoins) -> SqlResult<Option<String>> {
500 match &table.relation {
501 TableFactor::Table { name, .. } => Ok(Some(Self::object_name_to_string(name)?)),
502 _ => Ok(None),
503 }
504 }
505
506 fn tables_in_query(query: &Query) -> SqlResult<Vec<String>> {
507 let mut tables = Vec::new();
508 if let sqlparser::ast::SetExpr::Select(select) = query.body.as_ref() {
509 for table in &select.from {
510 if let TableFactor::Table { name, .. } = &table.relation {
511 tables.push(Self::object_name_to_string(name)?);
512 }
513 }
514 }
515 Ok(tables)
516 }
517
518 fn collect_known_columns(
519 &self,
520 display_name: &str,
521 canonical_name: &str,
522 ) -> SqlResult<HashSet<String>> {
523 let context = self.engine.context();
524
525 if context.is_table_marked_dropped(canonical_name) {
526 return Err(Self::table_not_found_error(display_name));
527 }
528
529 match context.table_column_specs(display_name) {
530 Ok(specs) => Ok(specs
531 .into_iter()
532 .map(|spec| spec.name.to_ascii_lowercase())
533 .collect()),
534 Err(err) => {
535 if !Self::is_table_missing_error(&err) {
536 return Err(Self::map_table_error(display_name, err));
537 }
538
539 Ok(HashSet::new())
540 }
541 }
542 }
543
544 fn is_table_marked_dropped(&self, table_name: &str) -> SqlResult<bool> {
545 let canonical = table_name.to_ascii_lowercase();
546 Ok(self.engine.context().is_table_marked_dropped(&canonical))
547 }
548
549 fn handle_create_table(
550 &self,
551 mut stmt: sqlparser::ast::CreateTable,
552 ) -> SqlResult<RuntimeStatementResult<P>> {
553 validate_create_table_common(&stmt)?;
554
555 let (mut schema_name, table_name) = parse_schema_qualified_name(&stmt.name)?;
556
557 let namespace = if stmt.temporary {
558 if schema_name.is_some() {
559 return Err(Error::InvalidArgumentError(
560 "temporary tables cannot specify an explicit schema".into(),
561 ));
562 }
563 schema_name = None;
564 Some(TEMPORARY_NAMESPACE_ID.to_string())
565 } else {
566 None
567 };
568
569 if let Some(ref schema) = schema_name {
571 let catalog = self.engine.context().table_catalog();
572 if !catalog.schema_exists(schema) {
573 return Err(Error::CatalogError(format!(
574 "Schema '{}' does not exist",
575 schema
576 )));
577 }
578 }
579
580 let display_name = match &schema_name {
582 Some(schema) => format!("{}.{}", schema, table_name),
583 None => table_name.clone(),
584 };
585 let canonical_name = display_name.to_ascii_lowercase();
586 tracing::trace!(
587 "\n=== HANDLE_CREATE_TABLE: table='{}' columns={} ===",
588 display_name,
589 stmt.columns.len()
590 );
591 if display_name.is_empty() {
592 return Err(Error::InvalidArgumentError(
593 "table name must not be empty".into(),
594 ));
595 }
596
597 if let Some(query) = stmt.query.take() {
598 validate_create_table_as(&stmt)?;
599 if let Some(result) = self.try_handle_range_ctas(
600 &display_name,
601 &canonical_name,
602 &query,
603 stmt.if_not_exists,
604 stmt.or_replace,
605 namespace.clone(),
606 )? {
607 return Ok(result);
608 }
609 return self.handle_create_table_as(
610 display_name,
611 canonical_name,
612 *query,
613 stmt.if_not_exists,
614 stmt.or_replace,
615 namespace.clone(),
616 );
617 }
618
619 if stmt.columns.is_empty() {
620 return Err(Error::InvalidArgumentError(
621 "CREATE TABLE requires at least one column".into(),
622 ));
623 }
624
625 validate_create_table_definition(&stmt)?;
626
627 let column_defs_ast = std::mem::take(&mut stmt.columns);
628 let constraints = std::mem::take(&mut stmt.constraints);
629
630 let column_names: Vec<String> = column_defs_ast
631 .iter()
632 .map(|column_def| column_def.name.value.clone())
633 .collect();
634 ensure_unique_case_insensitive(column_names.iter().map(|name| name.as_str()), |dup| {
635 format!(
636 "duplicate column name '{}' in table '{}'",
637 dup, display_name
638 )
639 })?;
640 let column_names_lower: HashSet<String> = column_names
641 .iter()
642 .map(|name| name.to_ascii_lowercase())
643 .collect();
644
645 let mut columns: Vec<ColumnSpec> = Vec::with_capacity(column_defs_ast.len());
646 let mut primary_key_columns: HashSet<String> = HashSet::new();
647 let mut foreign_keys: Vec<ForeignKeySpec> = Vec::new();
648
649 for column_def in column_defs_ast {
651 let is_nullable = column_def
652 .options
653 .iter()
654 .all(|opt| !matches!(opt.option, ColumnOption::NotNull));
655
656 let is_primary_key = column_def.options.iter().any(|opt| {
657 matches!(
658 opt.option,
659 ColumnOption::Unique {
660 is_primary: true,
661 characteristics: _
662 }
663 )
664 });
665
666 let has_unique_constraint = column_def
667 .options
668 .iter()
669 .any(|opt| matches!(opt.option, ColumnOption::Unique { .. }));
670
671 let check_expr = column_def.options.iter().find_map(|opt| {
673 if let ColumnOption::Check(expr) = &opt.option {
674 Some(expr)
675 } else {
676 None
677 }
678 });
679
680 if let Some(check_expr) = check_expr {
682 let all_col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
683 validate_check_constraint(check_expr, &display_name, &all_col_refs)?;
684 }
685
686 let check_expr_str = check_expr.map(|e| e.to_string());
687
688 for opt in &column_def.options {
690 if let ColumnOption::ForeignKey {
691 foreign_table,
692 referred_columns,
693 on_delete,
694 on_update,
695 characteristics,
696 } = &opt.option
697 {
698 let spec = self.build_foreign_key_spec(
699 &display_name,
700 &canonical_name,
701 vec![column_def.name.value.clone()],
702 foreign_table,
703 referred_columns,
704 *on_delete,
705 *on_update,
706 characteristics,
707 &column_names_lower,
708 None,
709 )?;
710 foreign_keys.push(spec);
711 }
712 }
713
714 tracing::trace!(
715 "DEBUG CREATE TABLE column '{}' is_primary_key={} has_unique={} check_expr={:?}",
716 column_def.name.value,
717 is_primary_key,
718 has_unique_constraint,
719 check_expr_str
720 );
721
722 let mut column = ColumnSpec::new(
723 column_def.name.value.clone(),
724 arrow_type_from_sql(&column_def.data_type)?,
725 is_nullable,
726 );
727 tracing::trace!(
728 "DEBUG ColumnSpec after new(): primary_key={} unique={}",
729 column.primary_key,
730 column.unique
731 );
732
733 column = column
734 .with_primary_key(is_primary_key)
735 .with_unique(has_unique_constraint)
736 .with_check(check_expr_str);
737
738 if is_primary_key {
739 column.nullable = false;
740 primary_key_columns.insert(column.name.to_ascii_lowercase());
741 }
742 tracing::trace!(
743 "DEBUG ColumnSpec after with_primary_key({})/with_unique({}): primary_key={} unique={} check_expr={:?}",
744 is_primary_key,
745 has_unique_constraint,
746 column.primary_key,
747 column.unique,
748 column.check_expr
749 );
750
751 columns.push(column);
752 }
753
754 if !constraints.is_empty() {
756 let mut column_lookup: HashMap<String, usize> = HashMap::with_capacity(columns.len());
757 for (idx, column) in columns.iter().enumerate() {
758 column_lookup.insert(column.name.to_ascii_lowercase(), idx);
759 }
760
761 for constraint in constraints {
762 match constraint {
763 TableConstraint::PrimaryKey {
764 columns: constraint_columns,
765 ..
766 } => {
767 if !primary_key_columns.is_empty() {
768 return Err(Error::InvalidArgumentError(
769 "multiple PRIMARY KEY constraints are not supported".into(),
770 ));
771 }
772
773 ensure_non_empty(&constraint_columns, || {
774 "PRIMARY KEY requires at least one column".into()
775 })?;
776
777 let mut pk_column_names: Vec<String> =
778 Vec::with_capacity(constraint_columns.len());
779
780 for index_col in &constraint_columns {
781 let column_ident = extract_index_column_name(
782 index_col,
783 "PRIMARY KEY",
784 false, false, )?;
787 pk_column_names.push(column_ident);
788 }
789
790 ensure_unique_case_insensitive(
791 pk_column_names.iter().map(|name| name.as_str()),
792 |dup| format!("duplicate column '{}' in PRIMARY KEY constraint", dup),
793 )?;
794
795 ensure_known_columns_case_insensitive(
796 pk_column_names.iter().map(|name| name.as_str()),
797 &column_names_lower,
798 |unknown| {
799 format!("unknown column '{}' in PRIMARY KEY constraint", unknown)
800 },
801 )?;
802
803 for column_ident in pk_column_names {
804 let normalized = column_ident.to_ascii_lowercase();
805 let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
806 Error::InvalidArgumentError(format!(
807 "unknown column '{}' in PRIMARY KEY constraint",
808 column_ident
809 ))
810 })?;
811
812 let column = columns.get_mut(idx).expect("column index valid");
813 column.primary_key = true;
814 column.unique = true;
815 column.nullable = false;
816
817 primary_key_columns.insert(normalized);
818 }
819 }
820 TableConstraint::Unique {
821 columns: constraint_columns,
822 index_type,
823 index_options,
824 characteristics,
825 nulls_distinct,
826 ..
827 } => {
828 if !matches!(nulls_distinct, NullsDistinctOption::None) {
829 return Err(Error::InvalidArgumentError(
830 "UNIQUE constraints with NULLS DISTINCT/NOT DISTINCT are not supported yet".into(),
831 ));
832 }
833
834 if index_type.is_some() {
835 return Err(Error::InvalidArgumentError(
836 "UNIQUE constraints with index types are not supported yet".into(),
837 ));
838 }
839
840 if !index_options.is_empty() {
841 return Err(Error::InvalidArgumentError(
842 "UNIQUE constraints with index options are not supported yet"
843 .into(),
844 ));
845 }
846
847 if characteristics.is_some() {
848 return Err(Error::InvalidArgumentError(
849 "UNIQUE constraint characteristics are not supported yet".into(),
850 ));
851 }
852
853 ensure_non_empty(&constraint_columns, || {
854 "UNIQUE constraint requires at least one column".into()
855 })?;
856
857 let mut unique_column_names: Vec<String> =
858 Vec::with_capacity(constraint_columns.len());
859
860 for index_column in &constraint_columns {
861 let column_ident = extract_index_column_name(
862 index_column,
863 "UNIQUE constraint",
864 false, false, )?;
867 unique_column_names.push(column_ident);
868 }
869
870 if unique_column_names.len() > 1 {
871 return Err(Error::InvalidArgumentError(
872 "multi-column UNIQUE constraints are not supported yet".into(),
873 ));
874 }
875
876 ensure_unique_case_insensitive(
877 unique_column_names.iter().map(|name| name.as_str()),
878 |dup| format!("duplicate column '{}' in UNIQUE constraint", dup),
879 )?;
880
881 ensure_known_columns_case_insensitive(
882 unique_column_names.iter().map(|name| name.as_str()),
883 &column_names_lower,
884 |unknown| format!("unknown column '{}' in UNIQUE constraint", unknown),
885 )?;
886
887 let column_ident = unique_column_names
888 .into_iter()
889 .next()
890 .expect("unique constraint checked for emptiness");
891 let normalized = column_ident.to_ascii_lowercase();
892 let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
893 Error::InvalidArgumentError(format!(
894 "unknown column '{}' in UNIQUE constraint",
895 column_ident
896 ))
897 })?;
898
899 let column = columns
900 .get_mut(idx)
901 .expect("column index from lookup must be valid");
902 column.unique = true;
903 }
904 TableConstraint::ForeignKey {
905 name,
906 index_name,
907 columns: fk_columns,
908 foreign_table,
909 referred_columns,
910 on_delete,
911 on_update,
912 characteristics,
913 ..
914 } => {
915 if index_name.is_some() {
916 return Err(Error::InvalidArgumentError(
917 "FOREIGN KEY index clauses are not supported yet".into(),
918 ));
919 }
920
921 let referencing_columns: Vec<String> =
922 fk_columns.into_iter().map(|ident| ident.value).collect();
923 let spec = self.build_foreign_key_spec(
924 &display_name,
925 &canonical_name,
926 referencing_columns,
927 &foreign_table,
928 &referred_columns,
929 on_delete,
930 on_update,
931 &characteristics,
932 &column_names_lower,
933 name.map(|ident| ident.value),
934 )?;
935
936 foreign_keys.push(spec);
937 }
938 unsupported => {
939 return Err(Error::InvalidArgumentError(format!(
940 "table-level constraint {:?} is not supported",
941 unsupported
942 )));
943 }
944 }
945 }
946 }
947
948 let plan = CreateTablePlan {
949 name: display_name,
950 if_not_exists: stmt.if_not_exists,
951 or_replace: stmt.or_replace,
952 columns,
953 source: None,
954 namespace,
955 foreign_keys,
956 };
957 self.execute_plan_statement(PlanStatement::CreateTable(plan))
958 }
959
960 fn handle_create_index(
961 &self,
962 stmt: sqlparser::ast::CreateIndex,
963 ) -> SqlResult<RuntimeStatementResult<P>> {
964 let sqlparser::ast::CreateIndex {
965 name,
966 table_name,
967 using,
968 columns,
969 unique,
970 concurrently,
971 if_not_exists,
972 include,
973 nulls_distinct,
974 with,
975 predicate,
976 index_options,
977 alter_options,
978 ..
979 } = stmt;
980
981 if concurrently {
982 return Err(Error::InvalidArgumentError(
983 "CREATE INDEX CONCURRENTLY is not supported".into(),
984 ));
985 }
986 if using.is_some() {
987 return Err(Error::InvalidArgumentError(
988 "CREATE INDEX USING clauses are not supported".into(),
989 ));
990 }
991 if !include.is_empty() {
992 return Err(Error::InvalidArgumentError(
993 "CREATE INDEX INCLUDE columns are not supported".into(),
994 ));
995 }
996 if nulls_distinct.is_some() {
997 return Err(Error::InvalidArgumentError(
998 "CREATE INDEX NULLS DISTINCT is not supported".into(),
999 ));
1000 }
1001 if !with.is_empty() {
1002 return Err(Error::InvalidArgumentError(
1003 "CREATE INDEX WITH options are not supported".into(),
1004 ));
1005 }
1006 if predicate.is_some() {
1007 return Err(Error::InvalidArgumentError(
1008 "partial CREATE INDEX is not supported".into(),
1009 ));
1010 }
1011 if !index_options.is_empty() {
1012 return Err(Error::InvalidArgumentError(
1013 "CREATE INDEX options are not supported".into(),
1014 ));
1015 }
1016 if !alter_options.is_empty() {
1017 return Err(Error::InvalidArgumentError(
1018 "CREATE INDEX ALTER options are not supported".into(),
1019 ));
1020 }
1021 if columns.is_empty() {
1022 return Err(Error::InvalidArgumentError(
1023 "CREATE INDEX requires at least one column".into(),
1024 ));
1025 }
1026
1027 let (schema_name, base_table_name) = parse_schema_qualified_name(&table_name)?;
1028 if let Some(ref schema) = schema_name {
1029 let catalog = self.engine.context().table_catalog();
1030 if !catalog.schema_exists(schema) {
1031 return Err(Error::CatalogError(format!(
1032 "Schema '{}' does not exist",
1033 schema
1034 )));
1035 }
1036 }
1037
1038 let display_table_name = schema_name
1039 .as_ref()
1040 .map(|schema| format!("{}.{}", schema, base_table_name))
1041 .unwrap_or_else(|| base_table_name.clone());
1042 let canonical_table_name = display_table_name.to_ascii_lowercase();
1043
1044 let known_columns =
1045 self.collect_known_columns(&display_table_name, &canonical_table_name)?;
1046 let enforce_known_columns = !known_columns.is_empty();
1047
1048 let index_name = match name {
1049 Some(name_obj) => Some(Self::object_name_to_string(&name_obj)?),
1050 None => None,
1051 };
1052
1053 let mut index_columns: Vec<IndexColumnPlan> = Vec::with_capacity(columns.len());
1054 let mut seen_column_names: HashSet<String> = HashSet::new();
1055 for item in columns {
1056 if item.column.with_fill.is_some() {
1058 return Err(Error::InvalidArgumentError(
1059 "CREATE INDEX column WITH FILL is not supported".into(),
1060 ));
1061 }
1062
1063 let column_name = extract_index_column_name(
1064 &item,
1065 "CREATE INDEX",
1066 true, true, )?;
1069
1070 let order_expr = &item.column;
1072 let ascending = order_expr.options.asc.unwrap_or(true);
1073 let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
1074
1075 let normalized = column_name.to_ascii_lowercase();
1076 if !seen_column_names.insert(normalized.clone()) {
1077 return Err(Error::InvalidArgumentError(format!(
1078 "duplicate column '{}' in CREATE INDEX",
1079 column_name
1080 )));
1081 }
1082
1083 if enforce_known_columns && !known_columns.contains(&normalized) {
1084 return Err(Error::InvalidArgumentError(format!(
1085 "column '{}' does not exist in table '{}'",
1086 column_name, display_table_name
1087 )));
1088 }
1089
1090 let column_plan = IndexColumnPlan::new(column_name).with_sort(ascending, nulls_first);
1091 index_columns.push(column_plan);
1092 }
1093
1094 if index_columns.len() > 1 && !unique {
1095 return Err(Error::InvalidArgumentError(
1096 "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
1097 ));
1098 }
1099
1100 let plan = CreateIndexPlan::new(display_table_name)
1101 .with_name(index_name)
1102 .with_unique(unique)
1103 .with_if_not_exists(if_not_exists)
1104 .with_columns(index_columns);
1105
1106 self.execute_plan_statement(PlanStatement::CreateIndex(plan))
1107 }
1108
1109 fn map_referential_action(
1110 action: Option<ReferentialAction>,
1111 kind: &str,
1112 ) -> SqlResult<ForeignKeyAction> {
1113 match action {
1114 None | Some(ReferentialAction::NoAction) => Ok(ForeignKeyAction::NoAction),
1115 Some(ReferentialAction::Restrict) => Ok(ForeignKeyAction::Restrict),
1116 Some(other) => Err(Error::InvalidArgumentError(format!(
1117 "FOREIGN KEY ON {kind} {:?} is not supported yet",
1118 other
1119 ))),
1120 }
1121 }
1122
1123 #[allow(clippy::too_many_arguments)]
1124 fn build_foreign_key_spec(
1125 &self,
1126 _referencing_display: &str,
1127 referencing_canonical: &str,
1128 referencing_columns: Vec<String>,
1129 foreign_table: &ObjectName,
1130 referenced_columns: &[Ident],
1131 on_delete: Option<ReferentialAction>,
1132 on_update: Option<ReferentialAction>,
1133 characteristics: &Option<ConstraintCharacteristics>,
1134 known_columns_lower: &HashSet<String>,
1135 name: Option<String>,
1136 ) -> SqlResult<ForeignKeySpec> {
1137 if characteristics.is_some() {
1138 return Err(Error::InvalidArgumentError(
1139 "FOREIGN KEY constraint characteristics are not supported yet".into(),
1140 ));
1141 }
1142
1143 ensure_non_empty(&referencing_columns, || {
1144 "FOREIGN KEY constraint requires at least one referencing column".into()
1145 })?;
1146 ensure_unique_case_insensitive(
1147 referencing_columns.iter().map(|name| name.as_str()),
1148 |dup| format!("duplicate column '{}' in FOREIGN KEY constraint", dup),
1149 )?;
1150 ensure_known_columns_case_insensitive(
1151 referencing_columns.iter().map(|name| name.as_str()),
1152 known_columns_lower,
1153 |unknown| format!("unknown column '{}' in FOREIGN KEY constraint", unknown),
1154 )?;
1155
1156 let referenced_columns_vec: Vec<String> = referenced_columns
1157 .iter()
1158 .map(|ident| ident.value.clone())
1159 .collect();
1160 ensure_unique_case_insensitive(
1161 referenced_columns_vec.iter().map(|name| name.as_str()),
1162 |dup| {
1163 format!(
1164 "duplicate referenced column '{}' in FOREIGN KEY constraint",
1165 dup
1166 )
1167 },
1168 )?;
1169
1170 if !referenced_columns_vec.is_empty()
1171 && referenced_columns_vec.len() != referencing_columns.len()
1172 {
1173 return Err(Error::InvalidArgumentError(
1174 "FOREIGN KEY referencing and referenced column counts must match".into(),
1175 ));
1176 }
1177
1178 let (referenced_display, referenced_canonical) = canonical_object_name(foreign_table)?;
1179
1180 if referenced_canonical == referencing_canonical {
1181 ensure_known_columns_case_insensitive(
1182 referenced_columns_vec.iter().map(|name| name.as_str()),
1183 known_columns_lower,
1184 |unknown| {
1185 format!(
1186 "unknown referenced column '{}' in FOREIGN KEY constraint",
1187 unknown
1188 )
1189 },
1190 )?;
1191 } else {
1192 let known_columns =
1193 self.collect_known_columns(&referenced_display, &referenced_canonical)?;
1194 if !known_columns.is_empty() {
1195 ensure_known_columns_case_insensitive(
1196 referenced_columns_vec.iter().map(|name| name.as_str()),
1197 &known_columns,
1198 |unknown| {
1199 format!(
1200 "unknown referenced column '{}' in FOREIGN KEY constraint",
1201 unknown
1202 )
1203 },
1204 )?;
1205 }
1206 }
1207
1208 let on_delete_action = Self::map_referential_action(on_delete, "DELETE")?;
1209 let on_update_action = Self::map_referential_action(on_update, "UPDATE")?;
1210
1211 Ok(ForeignKeySpec {
1212 name,
1213 columns: referencing_columns,
1214 referenced_table: referenced_display,
1215 referenced_columns: referenced_columns_vec,
1216 on_delete: on_delete_action,
1217 on_update: on_update_action,
1218 })
1219 }
1220
1221 fn handle_create_schema(
1222 &self,
1223 schema_name: SchemaName,
1224 _if_not_exists: bool,
1225 with: Option<Vec<SqlOption>>,
1226 options: Option<Vec<SqlOption>>,
1227 default_collate_spec: Option<SqlExpr>,
1228 clone: Option<ObjectName>,
1229 ) -> SqlResult<RuntimeStatementResult<P>> {
1230 if clone.is_some() {
1231 return Err(Error::InvalidArgumentError(
1232 "CREATE SCHEMA ... CLONE is not supported".into(),
1233 ));
1234 }
1235 if with.as_ref().is_some_and(|opts| !opts.is_empty()) {
1236 return Err(Error::InvalidArgumentError(
1237 "CREATE SCHEMA ... WITH options are not supported".into(),
1238 ));
1239 }
1240 if options.as_ref().is_some_and(|opts| !opts.is_empty()) {
1241 return Err(Error::InvalidArgumentError(
1242 "CREATE SCHEMA options are not supported".into(),
1243 ));
1244 }
1245 if default_collate_spec.is_some() {
1246 return Err(Error::InvalidArgumentError(
1247 "CREATE SCHEMA DEFAULT COLLATE is not supported".into(),
1248 ));
1249 }
1250
1251 let schema_name = match schema_name {
1252 SchemaName::Simple(name) => name,
1253 _ => {
1254 return Err(Error::InvalidArgumentError(
1255 "CREATE SCHEMA authorization is not supported".into(),
1256 ));
1257 }
1258 };
1259
1260 let (display_name, canonical) = canonical_object_name(&schema_name)?;
1261 if display_name.is_empty() {
1262 return Err(Error::InvalidArgumentError(
1263 "schema name must not be empty".into(),
1264 ));
1265 }
1266
1267 let catalog = self.engine.context().table_catalog();
1269
1270 if _if_not_exists && catalog.schema_exists(&canonical) {
1271 return Ok(RuntimeStatementResult::NoOp);
1272 }
1273
1274 catalog.register_schema(&canonical).map_err(|err| {
1275 Error::CatalogError(format!(
1276 "Failed to create schema '{}': {}",
1277 display_name, err
1278 ))
1279 })?;
1280
1281 Ok(RuntimeStatementResult::NoOp)
1282 }
1283
1284 fn try_handle_range_ctas(
1285 &self,
1286 display_name: &str,
1287 _canonical_name: &str,
1288 query: &Query,
1289 if_not_exists: bool,
1290 or_replace: bool,
1291 namespace: Option<String>,
1292 ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1293 let select = match query.body.as_ref() {
1294 SetExpr::Select(select) => select,
1295 _ => return Ok(None),
1296 };
1297 if select.from.len() != 1 {
1298 return Ok(None);
1299 }
1300 let table_with_joins = &select.from[0];
1301 if !table_with_joins.joins.is_empty() {
1302 return Ok(None);
1303 }
1304 let (range_size, range_alias) = match &table_with_joins.relation {
1305 TableFactor::Table {
1306 name,
1307 args: Some(args),
1308 alias,
1309 ..
1310 } => {
1311 let func_name = name.to_string().to_ascii_lowercase();
1312 if func_name != "range" {
1313 return Ok(None);
1314 }
1315 if args.args.len() != 1 {
1316 return Err(Error::InvalidArgumentError(
1317 "range table function expects a single argument".into(),
1318 ));
1319 }
1320 let size_expr = &args.args[0];
1321 let range_size = match size_expr {
1322 FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1323 match &value.value {
1324 Value::Number(raw, _) => raw.parse::<i64>().map_err(|e| {
1325 Error::InvalidArgumentError(format!(
1326 "invalid range size literal {}: {}",
1327 raw, e
1328 ))
1329 })?,
1330 other => {
1331 return Err(Error::InvalidArgumentError(format!(
1332 "unsupported range size value: {:?}",
1333 other
1334 )));
1335 }
1336 }
1337 }
1338 _ => {
1339 return Err(Error::InvalidArgumentError(
1340 "unsupported range argument".into(),
1341 ));
1342 }
1343 };
1344 (range_size, alias.as_ref().map(|a| a.name.value.clone()))
1345 }
1346 _ => return Ok(None),
1347 };
1348
1349 if range_size < 0 {
1350 return Err(Error::InvalidArgumentError(
1351 "range size must be non-negative".into(),
1352 ));
1353 }
1354
1355 if select.projection.is_empty() {
1356 return Err(Error::InvalidArgumentError(
1357 "CREATE TABLE AS SELECT requires at least one projected column".into(),
1358 ));
1359 }
1360
1361 let mut column_specs = Vec::with_capacity(select.projection.len());
1362 let mut column_names = Vec::with_capacity(select.projection.len());
1363 let mut row_template = Vec::with_capacity(select.projection.len());
1364 for item in &select.projection {
1365 match item {
1366 SelectItem::ExprWithAlias { expr, alias } => {
1367 let (value, data_type) = match expr {
1368 SqlExpr::Value(value_with_span) => match &value_with_span.value {
1369 Value::Number(raw, _) => {
1370 let parsed = raw.parse::<i64>().map_err(|e| {
1371 Error::InvalidArgumentError(format!(
1372 "invalid numeric literal {}: {}",
1373 raw, e
1374 ))
1375 })?;
1376 (
1377 PlanValue::Integer(parsed),
1378 arrow::datatypes::DataType::Int64,
1379 )
1380 }
1381 Value::SingleQuotedString(s) => (
1382 PlanValue::String(s.clone()),
1383 arrow::datatypes::DataType::Utf8,
1384 ),
1385 other => {
1386 return Err(Error::InvalidArgumentError(format!(
1387 "unsupported SELECT expression in range CTAS: {:?}",
1388 other
1389 )));
1390 }
1391 },
1392 SqlExpr::Identifier(ident) => {
1393 let ident_lower = ident.value.to_ascii_lowercase();
1394 if range_alias
1395 .as_ref()
1396 .map(|a| a.eq_ignore_ascii_case(&ident_lower))
1397 .unwrap_or(false)
1398 || ident_lower == "range"
1399 {
1400 return Err(Error::InvalidArgumentError(
1401 "range() table function columns are not supported yet".into(),
1402 ));
1403 }
1404 return Err(Error::InvalidArgumentError(format!(
1405 "unsupported identifier '{}' in range CTAS projection",
1406 ident.value
1407 )));
1408 }
1409 other => {
1410 return Err(Error::InvalidArgumentError(format!(
1411 "unsupported SELECT expression in range CTAS: {:?}",
1412 other
1413 )));
1414 }
1415 };
1416 let column_name = alias.value.clone();
1417 column_specs.push(ColumnSpec::new(column_name.clone(), data_type, true));
1418 column_names.push(column_name);
1419 row_template.push(value);
1420 }
1421 other => {
1422 return Err(Error::InvalidArgumentError(format!(
1423 "unsupported projection {:?} in range CTAS",
1424 other
1425 )));
1426 }
1427 }
1428 }
1429
1430 let plan = CreateTablePlan {
1431 name: display_name.to_string(),
1432 if_not_exists,
1433 or_replace,
1434 columns: column_specs,
1435 source: None,
1436 namespace,
1437 foreign_keys: Vec::new(),
1438 };
1439 let create_result = self.execute_plan_statement(PlanStatement::CreateTable(plan))?;
1440
1441 let row_count = range_size
1442 .try_into()
1443 .map_err(|_| Error::InvalidArgumentError("range size exceeds usize".into()))?;
1444 if row_count > 0 {
1445 let rows = vec![row_template; row_count];
1446 let insert_plan = InsertPlan {
1447 table: display_name.to_string(),
1448 columns: column_names,
1449 source: InsertSource::Rows(rows),
1450 };
1451 self.execute_plan_statement(PlanStatement::Insert(insert_plan))?;
1452 }
1453
1454 Ok(Some(create_result))
1455 }
1456
1457 fn try_handle_pragma_table_info(
1461 &self,
1462 query: &Query,
1463 ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1464 let select = match query.body.as_ref() {
1465 SetExpr::Select(select) => select,
1466 _ => return Ok(None),
1467 };
1468
1469 if select.from.len() != 1 {
1470 return Ok(None);
1471 }
1472
1473 let table_with_joins = &select.from[0];
1474 if !table_with_joins.joins.is_empty() {
1475 return Ok(None);
1476 }
1477
1478 let table_name = match &table_with_joins.relation {
1480 TableFactor::Table {
1481 name,
1482 args: Some(args),
1483 ..
1484 } => {
1485 let func_name = name.to_string().to_ascii_lowercase();
1486 if func_name != "pragma_table_info" {
1487 return Ok(None);
1488 }
1489
1490 if args.args.len() != 1 {
1492 return Err(Error::InvalidArgumentError(
1493 "pragma_table_info expects exactly one argument".into(),
1494 ));
1495 }
1496
1497 match &args.args[0] {
1498 FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1499 match &value.value {
1500 Value::SingleQuotedString(s) => s.clone(),
1501 Value::DoubleQuotedString(s) => s.clone(),
1502 _ => {
1503 return Err(Error::InvalidArgumentError(
1504 "pragma_table_info argument must be a string".into(),
1505 ));
1506 }
1507 }
1508 }
1509 _ => {
1510 return Err(Error::InvalidArgumentError(
1511 "pragma_table_info argument must be a string literal".into(),
1512 ));
1513 }
1514 }
1515 }
1516 _ => return Ok(None),
1517 };
1518
1519 let context = self.engine.context();
1521 let columns = context.table_column_specs(&table_name)?;
1522
1523 use arrow::array::{BooleanArray, Int32Array, StringArray};
1525 use arrow::datatypes::{DataType, Field, Schema};
1526
1527 let mut cid_values = Vec::new();
1528 let mut name_values = Vec::new();
1529 let mut type_values = Vec::new();
1530 let mut notnull_values = Vec::new();
1531 let mut dflt_value_values: Vec<Option<String>> = Vec::new();
1532 let mut pk_values = Vec::new();
1533
1534 for (idx, col) in columns.iter().enumerate() {
1535 cid_values.push(idx as i32);
1536 name_values.push(col.name.clone());
1537 type_values.push(format!("{:?}", col.data_type)); notnull_values.push(!col.nullable);
1539 dflt_value_values.push(None); pk_values.push(col.primary_key);
1541 }
1542
1543 let schema = Arc::new(Schema::new(vec![
1544 Field::new("cid", DataType::Int32, false),
1545 Field::new("name", DataType::Utf8, false),
1546 Field::new("type", DataType::Utf8, false),
1547 Field::new("notnull", DataType::Boolean, false),
1548 Field::new("dflt_value", DataType::Utf8, true),
1549 Field::new("pk", DataType::Boolean, false),
1550 ]));
1551
1552 use arrow::array::ArrayRef;
1553 let mut batch = RecordBatch::try_new(
1554 Arc::clone(&schema),
1555 vec![
1556 Arc::new(Int32Array::from(cid_values)) as ArrayRef,
1557 Arc::new(StringArray::from(name_values)) as ArrayRef,
1558 Arc::new(StringArray::from(type_values)) as ArrayRef,
1559 Arc::new(BooleanArray::from(notnull_values)) as ArrayRef,
1560 Arc::new(StringArray::from(dflt_value_values)) as ArrayRef,
1561 Arc::new(BooleanArray::from(pk_values)) as ArrayRef,
1562 ],
1563 )
1564 .map_err(|e| Error::Internal(format!("failed to create pragma_table_info batch: {}", e)))?;
1565
1566 let projection_indices: Vec<usize> = select
1568 .projection
1569 .iter()
1570 .filter_map(|item| {
1571 match item {
1572 SelectItem::UnnamedExpr(SqlExpr::Identifier(ident)) => {
1573 schema.index_of(&ident.value).ok()
1574 }
1575 SelectItem::ExprWithAlias { expr, .. } => {
1576 if let SqlExpr::Identifier(ident) = expr {
1577 schema.index_of(&ident.value).ok()
1578 } else {
1579 None
1580 }
1581 }
1582 SelectItem::Wildcard(_) => None, _ => None,
1584 }
1585 })
1586 .collect();
1587
1588 let projected_schema;
1590 if !projection_indices.is_empty() {
1591 let projected_fields: Vec<Field> = projection_indices
1592 .iter()
1593 .map(|&idx| schema.field(idx).clone())
1594 .collect();
1595 projected_schema = Arc::new(Schema::new(projected_fields));
1596
1597 let projected_columns: Vec<ArrayRef> = projection_indices
1598 .iter()
1599 .map(|&idx| Arc::clone(batch.column(idx)))
1600 .collect();
1601
1602 batch = RecordBatch::try_new(Arc::clone(&projected_schema), projected_columns)
1603 .map_err(|e| Error::Internal(format!("failed to project columns: {}", e)))?;
1604 } else {
1605 projected_schema = schema;
1607 }
1608
1609 if let Some(order_by) = &query.order_by {
1611 use arrow::compute::SortColumn;
1612 use arrow::compute::lexsort_to_indices;
1613 use sqlparser::ast::OrderByKind;
1614
1615 let exprs = match &order_by.kind {
1616 OrderByKind::Expressions(exprs) => exprs,
1617 _ => {
1618 return Err(Error::InvalidArgumentError(
1619 "unsupported ORDER BY clause".into(),
1620 ));
1621 }
1622 };
1623
1624 let mut sort_columns = Vec::new();
1625 for order_expr in exprs {
1626 if let SqlExpr::Identifier(ident) = &order_expr.expr
1627 && let Ok(col_idx) = projected_schema.index_of(&ident.value)
1628 {
1629 let options = arrow::compute::SortOptions {
1630 descending: !order_expr.options.asc.unwrap_or(true),
1631 nulls_first: order_expr.options.nulls_first.unwrap_or(false),
1632 };
1633 sort_columns.push(SortColumn {
1634 values: Arc::clone(batch.column(col_idx)),
1635 options: Some(options),
1636 });
1637 }
1638 }
1639
1640 if !sort_columns.is_empty() {
1641 let indices = lexsort_to_indices(&sort_columns, None)
1642 .map_err(|e| Error::Internal(format!("failed to sort: {}", e)))?;
1643
1644 use arrow::compute::take;
1645 let sorted_columns: Result<Vec<ArrayRef>, _> = batch
1646 .columns()
1647 .iter()
1648 .map(|col| take(col.as_ref(), &indices, None))
1649 .collect();
1650
1651 batch = RecordBatch::try_new(
1652 Arc::clone(&projected_schema),
1653 sorted_columns
1654 .map_err(|e| Error::Internal(format!("failed to apply sort: {}", e)))?,
1655 )
1656 .map_err(|e| Error::Internal(format!("failed to create sorted batch: {}", e)))?;
1657 }
1658 }
1659
1660 let execution = SelectExecution::new_single_batch(
1661 table_name.clone(),
1662 Arc::clone(&projected_schema),
1663 batch,
1664 );
1665
1666 Ok(Some(RuntimeStatementResult::Select {
1667 table_name,
1668 schema: projected_schema,
1669 execution,
1670 }))
1671 }
1672
1673 fn handle_create_table_as(
1674 &self,
1675 display_name: String,
1676 _canonical_name: String,
1677 query: Query,
1678 if_not_exists: bool,
1679 or_replace: bool,
1680 namespace: Option<String>,
1681 ) -> SqlResult<RuntimeStatementResult<P>> {
1682 let select_plan = self.build_select_plan(query)?;
1683
1684 if select_plan.projections.is_empty() && select_plan.aggregates.is_empty() {
1685 return Err(Error::InvalidArgumentError(
1686 "CREATE TABLE AS SELECT requires at least one projected column".into(),
1687 ));
1688 }
1689
1690 let plan = CreateTablePlan {
1691 name: display_name,
1692 if_not_exists,
1693 or_replace,
1694 columns: Vec::new(),
1695 source: Some(CreateTableSource::Select {
1696 plan: Box::new(select_plan),
1697 }),
1698 namespace,
1699 foreign_keys: Vec::new(),
1700 };
1701 self.execute_plan_statement(PlanStatement::CreateTable(plan))
1702 }
1703
1704 fn handle_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<RuntimeStatementResult<P>> {
1705 let table_name_debug =
1706 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
1707 tracing::trace!(
1708 "DEBUG SQL handle_insert called for table={}",
1709 table_name_debug
1710 );
1711 if !self.engine.session().has_active_transaction()
1712 && self.is_table_marked_dropped(&table_name_debug)?
1713 {
1714 return Err(Error::TransactionContextError(
1715 DROPPED_TABLE_TRANSACTION_ERR.into(),
1716 ));
1717 }
1718 if stmt.replace_into || stmt.ignore || stmt.or.is_some() {
1719 return Err(Error::InvalidArgumentError(
1720 "non-standard INSERT forms are not supported".into(),
1721 ));
1722 }
1723 if stmt.overwrite {
1724 return Err(Error::InvalidArgumentError(
1725 "INSERT OVERWRITE is not supported".into(),
1726 ));
1727 }
1728 if !stmt.assignments.is_empty() {
1729 return Err(Error::InvalidArgumentError(
1730 "INSERT ... SET is not supported".into(),
1731 ));
1732 }
1733 if stmt.partitioned.is_some() || !stmt.after_columns.is_empty() {
1734 return Err(Error::InvalidArgumentError(
1735 "partitioned INSERT is not supported".into(),
1736 ));
1737 }
1738 if stmt.returning.is_some() {
1739 return Err(Error::InvalidArgumentError(
1740 "INSERT ... RETURNING is not supported".into(),
1741 ));
1742 }
1743 if stmt.format_clause.is_some() || stmt.settings.is_some() {
1744 return Err(Error::InvalidArgumentError(
1745 "INSERT with FORMAT or SETTINGS is not supported".into(),
1746 ));
1747 }
1748
1749 let (display_name, _canonical_name) = match &stmt.table {
1750 TableObject::TableName(name) => canonical_object_name(name)?,
1751 _ => {
1752 return Err(Error::InvalidArgumentError(
1753 "INSERT requires a plain table name".into(),
1754 ));
1755 }
1756 };
1757
1758 let columns: Vec<String> = stmt
1759 .columns
1760 .iter()
1761 .map(|ident| ident.value.clone())
1762 .collect();
1763 let source_expr = stmt
1764 .source
1765 .as_ref()
1766 .ok_or_else(|| Error::InvalidArgumentError("INSERT requires a VALUES clause".into()))?;
1767 validate_simple_query(source_expr)?;
1768
1769 let insert_source = match source_expr.body.as_ref() {
1770 SetExpr::Values(values) => {
1771 if values.rows.is_empty() {
1772 return Err(Error::InvalidArgumentError(
1773 "INSERT VALUES list must contain at least one row".into(),
1774 ));
1775 }
1776 let mut rows: Vec<Vec<SqlValue>> = Vec::with_capacity(values.rows.len());
1777 for row in &values.rows {
1778 let mut converted = Vec::with_capacity(row.len());
1779 for expr in row {
1780 converted.push(SqlValue::try_from_expr(expr)?);
1781 }
1782 rows.push(converted);
1783 }
1784 InsertSource::Rows(
1785 rows.into_iter()
1786 .map(|row| row.into_iter().map(PlanValue::from).collect())
1787 .collect(),
1788 )
1789 }
1790 SetExpr::Select(select) => {
1791 if let Some(rows) = extract_constant_select_rows(select.as_ref())? {
1792 InsertSource::Rows(rows)
1793 } else if let Some(range_rows) = extract_rows_from_range(select.as_ref())? {
1794 InsertSource::Rows(range_rows.into_rows())
1795 } else {
1796 let select_plan = self.build_select_plan((**source_expr).clone())?;
1797 InsertSource::Select {
1798 plan: Box::new(select_plan),
1799 }
1800 }
1801 }
1802 _ => {
1803 return Err(Error::InvalidArgumentError(
1804 "unsupported INSERT source".into(),
1805 ));
1806 }
1807 };
1808
1809 let plan = InsertPlan {
1810 table: display_name.clone(),
1811 columns,
1812 source: insert_source,
1813 };
1814 tracing::trace!(
1815 "DEBUG SQL handle_insert: about to execute insert for table={}",
1816 display_name
1817 );
1818 self.execute_plan_statement(PlanStatement::Insert(plan))
1819 }
1820
1821 fn handle_update(
1822 &self,
1823 table: TableWithJoins,
1824 assignments: Vec<Assignment>,
1825 from: Option<UpdateTableFromKind>,
1826 selection: Option<SqlExpr>,
1827 returning: Option<Vec<SelectItem>>,
1828 ) -> SqlResult<RuntimeStatementResult<P>> {
1829 if from.is_some() {
1830 return Err(Error::InvalidArgumentError(
1831 "UPDATE ... FROM is not supported yet".into(),
1832 ));
1833 }
1834 if returning.is_some() {
1835 return Err(Error::InvalidArgumentError(
1836 "UPDATE ... RETURNING is not supported".into(),
1837 ));
1838 }
1839 if assignments.is_empty() {
1840 return Err(Error::InvalidArgumentError(
1841 "UPDATE requires at least one assignment".into(),
1842 ));
1843 }
1844
1845 let (display_name, canonical_name) = extract_single_table(std::slice::from_ref(&table))?;
1846
1847 if !self.engine.session().has_active_transaction()
1848 && self
1849 .engine
1850 .context()
1851 .is_table_marked_dropped(&canonical_name)
1852 {
1853 return Err(Error::TransactionContextError(
1854 DROPPED_TABLE_TRANSACTION_ERR.into(),
1855 ));
1856 }
1857
1858 let catalog = self.engine.context().table_catalog();
1859 let resolver = catalog.identifier_resolver();
1860 let table_id = catalog.table_id(&canonical_name);
1861
1862 let mut column_assignments = Vec::with_capacity(assignments.len());
1863 let mut seen: HashMap<String, ()> = HashMap::new();
1864 for assignment in assignments {
1865 let column_name = resolve_assignment_column_name(&assignment.target)?;
1866 let normalized = column_name.to_ascii_lowercase();
1867 if seen.insert(normalized, ()).is_some() {
1868 return Err(Error::InvalidArgumentError(format!(
1869 "duplicate column '{}' in UPDATE assignments",
1870 column_name
1871 )));
1872 }
1873 let value = match SqlValue::try_from_expr(&assignment.value) {
1874 Ok(literal) => AssignmentValue::Literal(PlanValue::from(literal)),
1875 Err(Error::InvalidArgumentError(msg))
1876 if msg.contains("unsupported literal expression") =>
1877 {
1878 let translated = translate_scalar_with_context(
1879 &resolver,
1880 IdentifierContext::new(table_id),
1881 &assignment.value,
1882 )?;
1883 AssignmentValue::Expression(translated)
1884 }
1885 Err(err) => return Err(err),
1886 };
1887 column_assignments.push(ColumnAssignment {
1888 column: column_name,
1889 value,
1890 });
1891 }
1892
1893 let filter = match selection {
1894 Some(expr) => Some(translate_condition_with_context(
1895 &resolver,
1896 IdentifierContext::new(table_id),
1897 &expr,
1898 )?),
1899 None => None,
1900 };
1901
1902 let plan = UpdatePlan {
1903 table: display_name.clone(),
1904 assignments: column_assignments,
1905 filter,
1906 };
1907 self.execute_plan_statement(PlanStatement::Update(plan))
1908 }
1909
1910 #[allow(clippy::collapsible_if)]
1911 fn handle_delete(&self, delete: Delete) -> SqlResult<RuntimeStatementResult<P>> {
1912 let Delete {
1913 tables,
1914 from,
1915 using,
1916 selection,
1917 returning,
1918 order_by,
1919 limit,
1920 } = delete;
1921
1922 if !tables.is_empty() {
1923 return Err(Error::InvalidArgumentError(
1924 "multi-table DELETE is not supported yet".into(),
1925 ));
1926 }
1927 if let Some(using_tables) = using {
1928 if !using_tables.is_empty() {
1929 return Err(Error::InvalidArgumentError(
1930 "DELETE ... USING is not supported yet".into(),
1931 ));
1932 }
1933 }
1934 if returning.is_some() {
1935 return Err(Error::InvalidArgumentError(
1936 "DELETE ... RETURNING is not supported".into(),
1937 ));
1938 }
1939 if !order_by.is_empty() {
1940 return Err(Error::InvalidArgumentError(
1941 "DELETE ... ORDER BY is not supported yet".into(),
1942 ));
1943 }
1944 if limit.is_some() {
1945 return Err(Error::InvalidArgumentError(
1946 "DELETE ... LIMIT is not supported yet".into(),
1947 ));
1948 }
1949
1950 let from_tables = match from {
1951 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
1952 };
1953 let (display_name, canonical_name) = extract_single_table(&from_tables)?;
1954
1955 if !self.engine.session().has_active_transaction()
1956 && self
1957 .engine
1958 .context()
1959 .is_table_marked_dropped(&canonical_name)
1960 {
1961 return Err(Error::TransactionContextError(
1962 DROPPED_TABLE_TRANSACTION_ERR.into(),
1963 ));
1964 }
1965
1966 let catalog = self.engine.context().table_catalog();
1967 let resolver = catalog.identifier_resolver();
1968 let table_id = catalog.table_id(&canonical_name);
1969
1970 let filter = selection
1971 .map(|expr| {
1972 translate_condition_with_context(&resolver, IdentifierContext::new(table_id), &expr)
1973 })
1974 .transpose()?;
1975
1976 let plan = DeletePlan {
1977 table: display_name.clone(),
1978 filter,
1979 };
1980 self.execute_plan_statement(PlanStatement::Delete(plan))
1981 }
1982
1983 #[allow(clippy::too_many_arguments)] fn handle_drop(
1985 &self,
1986 object_type: ObjectType,
1987 if_exists: bool,
1988 names: Vec<ObjectName>,
1989 cascade: bool,
1990 restrict: bool,
1991 purge: bool,
1992 temporary: bool,
1993 ) -> SqlResult<RuntimeStatementResult<P>> {
1994 if purge || temporary {
1995 return Err(Error::InvalidArgumentError(
1996 "DROP purge/temporary options are not supported".into(),
1997 ));
1998 }
1999
2000 match object_type {
2001 ObjectType::Table => {
2002 if cascade || restrict {
2003 return Err(Error::InvalidArgumentError(
2004 "DROP TABLE CASCADE/RESTRICT is not supported".into(),
2005 ));
2006 }
2007
2008 let session = self.engine.session();
2009 for name in names {
2010 let table_name = Self::object_name_to_string(&name)?;
2011 session
2012 .drop_table(&table_name, if_exists)
2013 .map_err(|err| Self::map_table_error(&table_name, err))?;
2014 }
2015
2016 Ok(RuntimeStatementResult::NoOp)
2017 }
2018 ObjectType::Schema => {
2019 if restrict {
2020 return Err(Error::InvalidArgumentError(
2021 "DROP SCHEMA RESTRICT is not supported".into(),
2022 ));
2023 }
2024
2025 let catalog = self.engine.context().table_catalog();
2026
2027 for name in names {
2028 let (display_name, canonical_name) = canonical_object_name(&name)?;
2029
2030 if !catalog.schema_exists(&canonical_name) {
2031 if if_exists {
2032 continue;
2033 }
2034 return Err(Error::CatalogError(format!(
2035 "Schema '{}' does not exist",
2036 display_name
2037 )));
2038 }
2039
2040 if cascade {
2041 let all_tables = catalog.table_names();
2043 let schema_prefix = format!("{}.", canonical_name);
2044
2045 let ctx = self.engine.context();
2046 for table in all_tables {
2047 if table.to_ascii_lowercase().starts_with(&schema_prefix) {
2048 ctx.drop_table_immediate(&table, false)?;
2049 }
2050 }
2051 } else {
2052 let all_tables = catalog.table_names();
2054 let schema_prefix = format!("{}.", canonical_name);
2055 let has_tables = all_tables
2056 .iter()
2057 .any(|t| t.to_ascii_lowercase().starts_with(&schema_prefix));
2058
2059 if has_tables {
2060 return Err(Error::CatalogError(format!(
2061 "Schema '{}' is not empty. Use CASCADE to drop schema and all its tables",
2062 display_name
2063 )));
2064 }
2065 }
2066
2067 if !catalog.unregister_schema(&canonical_name) && !if_exists {
2069 return Err(Error::CatalogError(format!(
2070 "Schema '{}' does not exist",
2071 display_name
2072 )));
2073 }
2074 }
2075
2076 Ok(RuntimeStatementResult::NoOp)
2077 }
2078 _ => Err(Error::InvalidArgumentError(format!(
2079 "DROP {} is not supported",
2080 object_type
2081 ))),
2082 }
2083 }
2084
2085 fn handle_query(&self, query: Query) -> SqlResult<RuntimeStatementResult<P>> {
2086 if let Some(result) = self.try_handle_pragma_table_info(&query)? {
2088 return Ok(result);
2089 }
2090
2091 let select_plan = self.build_select_plan(query)?;
2092 self.execute_plan_statement(PlanStatement::Select(select_plan))
2093 }
2094
2095 fn build_select_plan(&self, query: Query) -> SqlResult<SelectPlan> {
2096 if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
2097 return Err(Error::TransactionContextError(
2098 "TransactionContext Error: transaction is aborted".into(),
2099 ));
2100 }
2101
2102 validate_simple_query(&query)?;
2103 let catalog = self.engine.context().table_catalog();
2104 let resolver = catalog.identifier_resolver();
2105
2106 let (mut select_plan, select_context) = match query.body.as_ref() {
2107 SetExpr::Select(select) => self.translate_select(select.as_ref(), &resolver)?,
2108 other => {
2109 return Err(Error::InvalidArgumentError(format!(
2110 "unsupported query expression: {other:?}"
2111 )));
2112 }
2113 };
2114 if let Some(order_by) = &query.order_by {
2115 if !select_plan.aggregates.is_empty() {
2116 return Err(Error::InvalidArgumentError(
2117 "ORDER BY is not supported for aggregate queries".into(),
2118 ));
2119 }
2120 let order_plan = self.translate_order_by(&resolver, select_context, order_by)?;
2121 select_plan = select_plan.with_order_by(order_plan);
2122 }
2123 Ok(select_plan)
2124 }
2125
2126 fn translate_select(
2127 &self,
2128 select: &Select,
2129 resolver: &IdentifierResolver<'_>,
2130 ) -> SqlResult<(SelectPlan, IdentifierContext)> {
2131 if select.distinct.is_some() {
2132 return Err(Error::InvalidArgumentError(
2133 "SELECT DISTINCT is not supported".into(),
2134 ));
2135 }
2136 if select.top.is_some() {
2137 return Err(Error::InvalidArgumentError(
2138 "SELECT TOP is not supported".into(),
2139 ));
2140 }
2141 if select.exclude.is_some() {
2142 return Err(Error::InvalidArgumentError(
2143 "SELECT EXCLUDE is not supported".into(),
2144 ));
2145 }
2146 if select.into.is_some() {
2147 return Err(Error::InvalidArgumentError(
2148 "SELECT INTO is not supported".into(),
2149 ));
2150 }
2151 if !select.lateral_views.is_empty() {
2152 return Err(Error::InvalidArgumentError(
2153 "LATERAL VIEW is not supported".into(),
2154 ));
2155 }
2156 if select.prewhere.is_some() {
2157 return Err(Error::InvalidArgumentError(
2158 "PREWHERE is not supported".into(),
2159 ));
2160 }
2161 if !group_by_is_empty(&select.group_by) || select.value_table_mode.is_some() {
2162 return Err(Error::InvalidArgumentError(
2163 "GROUP BY and SELECT AS VALUE/STRUCT are not supported".into(),
2164 ));
2165 }
2166 if !select.cluster_by.is_empty()
2167 || !select.distribute_by.is_empty()
2168 || !select.sort_by.is_empty()
2169 {
2170 return Err(Error::InvalidArgumentError(
2171 "CLUSTER/DISTRIBUTE/SORT BY clauses are not supported".into(),
2172 ));
2173 }
2174 if select.having.is_some()
2175 || !select.named_window.is_empty()
2176 || select.qualify.is_some()
2177 || select.connect_by.is_some()
2178 {
2179 return Err(Error::InvalidArgumentError(
2180 "advanced SELECT clauses are not supported".into(),
2181 ));
2182 }
2183
2184 let table_alias = select
2185 .from
2186 .first()
2187 .and_then(|table_with_joins| match &table_with_joins.relation {
2188 TableFactor::Table { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2189 _ => None,
2190 });
2191
2192 if let Some(alias) = table_alias.as_ref() {
2193 validate_projection_alias_qualifiers(&select.projection, alias)?;
2194 }
2195 let catalog = self.engine.context().table_catalog();
2197 let (mut plan, id_context) = if select.from.is_empty() {
2198 let mut p = SelectPlan::new("");
2200 let projections = self.build_projection_list(
2201 resolver,
2202 IdentifierContext::new(None),
2203 &select.projection,
2204 )?;
2205 p = p.with_projections(projections);
2206 (p, IdentifierContext::new(None))
2207 } else if select.from.len() == 1 {
2208 let (display_name, canonical_name) = extract_single_table(&select.from)?;
2210 let table_id = catalog.table_id(&canonical_name);
2211 let mut p = SelectPlan::new(display_name.clone());
2212 if let Some(aggregates) = self.detect_simple_aggregates(&select.projection)? {
2213 p = p.with_aggregates(aggregates);
2214 } else {
2215 let projections = self.build_projection_list(
2216 resolver,
2217 IdentifierContext::new(table_id),
2218 &select.projection,
2219 )?;
2220 p = p.with_projections(projections);
2221 }
2222 (p, IdentifierContext::new(table_id))
2223 } else {
2224 let tables = extract_tables(&select.from)?;
2226 let mut p = SelectPlan::with_tables(tables);
2227 let projections = self.build_projection_list(
2230 resolver,
2231 IdentifierContext::new(None),
2232 &select.projection,
2233 )?;
2234 p = p.with_projections(projections);
2235 (p, IdentifierContext::new(None))
2236 };
2237
2238 let filter_expr = match &select.selection {
2239 Some(expr) => Some(translate_condition_with_context(
2240 resolver, id_context, expr,
2241 )?),
2242 None => None,
2243 };
2244 plan = plan.with_filter(filter_expr);
2245 Ok((plan, id_context))
2246 }
2247
2248 fn translate_order_by(
2249 &self,
2250 resolver: &IdentifierResolver<'_>,
2251 id_context: IdentifierContext,
2252 order_by: &OrderBy,
2253 ) -> SqlResult<Vec<OrderByPlan>> {
2254 let exprs = match &order_by.kind {
2255 OrderByKind::Expressions(exprs) => exprs,
2256 _ => {
2257 return Err(Error::InvalidArgumentError(
2258 "unsupported ORDER BY clause".into(),
2259 ));
2260 }
2261 };
2262
2263 let base_nulls_first = self.default_nulls_first.load(AtomicOrdering::Relaxed);
2264
2265 let resolve_simple_column = |expr: &SqlExpr| -> SqlResult<String> {
2266 let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2267 match scalar {
2268 llkv_expr::expr::ScalarExpr::Column(column) => Ok(column),
2269 other => Err(Error::InvalidArgumentError(format!(
2270 "ORDER BY expression must reference a simple column, found {other:?}"
2271 ))),
2272 }
2273 };
2274
2275 let mut plans = Vec::with_capacity(exprs.len());
2276 for order_expr in exprs {
2277 let ascending = order_expr.options.asc.unwrap_or(true);
2278 let default_nulls_first_for_direction = if ascending {
2279 base_nulls_first
2280 } else {
2281 !base_nulls_first
2282 };
2283 let nulls_first = order_expr
2284 .options
2285 .nulls_first
2286 .unwrap_or(default_nulls_first_for_direction);
2287
2288 if let SqlExpr::Identifier(ident) = &order_expr.expr
2289 && ident.value.eq_ignore_ascii_case("ALL")
2290 && ident.quote_style.is_none()
2291 {
2292 plans.push(OrderByPlan {
2293 target: OrderTarget::All,
2294 sort_type: OrderSortType::Native,
2295 ascending,
2296 nulls_first,
2297 });
2298 continue;
2299 }
2300
2301 let (target, sort_type) = match &order_expr.expr {
2302 SqlExpr::Identifier(_) | SqlExpr::CompoundIdentifier(_) => (
2303 OrderTarget::Column(resolve_simple_column(&order_expr.expr)?),
2304 OrderSortType::Native,
2305 ),
2306 SqlExpr::Cast {
2307 expr,
2308 data_type:
2309 SqlDataType::Int(_)
2310 | SqlDataType::Integer(_)
2311 | SqlDataType::BigInt(_)
2312 | SqlDataType::SmallInt(_)
2313 | SqlDataType::TinyInt(_),
2314 ..
2315 } => (
2316 OrderTarget::Column(resolve_simple_column(expr)?),
2317 OrderSortType::CastTextToInteger,
2318 ),
2319 SqlExpr::Cast { data_type, .. } => {
2320 return Err(Error::InvalidArgumentError(format!(
2321 "ORDER BY CAST target type {:?} is not supported",
2322 data_type
2323 )));
2324 }
2325 SqlExpr::Value(value_with_span) => match &value_with_span.value {
2326 Value::Number(raw, _) => {
2327 let position: usize = raw.parse().map_err(|_| {
2328 Error::InvalidArgumentError(format!(
2329 "ORDER BY position '{}' is not a valid positive integer",
2330 raw
2331 ))
2332 })?;
2333 if position == 0 {
2334 return Err(Error::InvalidArgumentError(
2335 "ORDER BY position must be at least 1".into(),
2336 ));
2337 }
2338 (OrderTarget::Index(position - 1), OrderSortType::Native)
2339 }
2340 other => {
2341 return Err(Error::InvalidArgumentError(format!(
2342 "unsupported ORDER BY literal expression: {other:?}"
2343 )));
2344 }
2345 },
2346 other => {
2347 return Err(Error::InvalidArgumentError(format!(
2348 "unsupported ORDER BY expression: {other:?}"
2349 )));
2350 }
2351 };
2352
2353 plans.push(OrderByPlan {
2354 target,
2355 sort_type,
2356 ascending,
2357 nulls_first,
2358 });
2359 }
2360
2361 Ok(plans)
2362 }
2363
2364 fn detect_simple_aggregates(
2365 &self,
2366 projection_items: &[SelectItem],
2367 ) -> SqlResult<Option<Vec<AggregateExpr>>> {
2368 if projection_items.is_empty() {
2369 return Ok(None);
2370 }
2371
2372 let mut specs: Vec<AggregateExpr> = Vec::with_capacity(projection_items.len());
2373 for (idx, item) in projection_items.iter().enumerate() {
2374 let (expr, alias_opt) = match item {
2375 SelectItem::UnnamedExpr(expr) => (expr, None),
2376 SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())),
2377 _ => return Ok(None),
2378 };
2379
2380 let alias = alias_opt.unwrap_or_else(|| format!("col{}", idx + 1));
2381 let SqlExpr::Function(func) = expr else {
2382 return Ok(None);
2383 };
2384
2385 if func.uses_odbc_syntax {
2386 return Err(Error::InvalidArgumentError(
2387 "ODBC function syntax is not supported in aggregate queries".into(),
2388 ));
2389 }
2390 if !matches!(func.parameters, FunctionArguments::None) {
2391 return Err(Error::InvalidArgumentError(
2392 "parameterized aggregate functions are not supported".into(),
2393 ));
2394 }
2395 if func.filter.is_some()
2396 || func.null_treatment.is_some()
2397 || func.over.is_some()
2398 || !func.within_group.is_empty()
2399 {
2400 return Err(Error::InvalidArgumentError(
2401 "advanced aggregate clauses are not supported".into(),
2402 ));
2403 }
2404
2405 let mut is_distinct = false;
2406 let args_slice: &[FunctionArg] = match &func.args {
2407 FunctionArguments::List(list) => {
2408 if let Some(dup) = &list.duplicate_treatment {
2409 use sqlparser::ast::DuplicateTreatment;
2410 match dup {
2411 DuplicateTreatment::All => {}
2412 DuplicateTreatment::Distinct => is_distinct = true,
2413 }
2414 }
2415 if !list.clauses.is_empty() {
2416 return Err(Error::InvalidArgumentError(
2417 "aggregate argument clauses are not supported".into(),
2418 ));
2419 }
2420 &list.args
2421 }
2422 FunctionArguments::None => &[],
2423 FunctionArguments::Subquery(_) => {
2424 return Err(Error::InvalidArgumentError(
2425 "aggregate subquery arguments are not supported".into(),
2426 ));
2427 }
2428 };
2429
2430 let func_name = if func.name.0.len() == 1 {
2431 match &func.name.0[0] {
2432 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
2433 _ => {
2434 return Err(Error::InvalidArgumentError(
2435 "unsupported aggregate function name".into(),
2436 ));
2437 }
2438 }
2439 } else {
2440 return Err(Error::InvalidArgumentError(
2441 "qualified aggregate function names are not supported".into(),
2442 ));
2443 };
2444
2445 let aggregate = match func_name.as_str() {
2446 "count" => {
2447 if args_slice.len() != 1 {
2448 return Err(Error::InvalidArgumentError(
2449 "COUNT accepts exactly one argument".into(),
2450 ));
2451 }
2452 match &args_slice[0] {
2453 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
2454 if is_distinct {
2455 return Err(Error::InvalidArgumentError(
2456 "COUNT(DISTINCT *) is not supported".into(),
2457 ));
2458 }
2459 AggregateExpr::count_star(alias)
2460 }
2461 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
2462 let column = resolve_column_name(arg_expr)?;
2463 if is_distinct {
2464 AggregateExpr::count_distinct_column(column, alias)
2465 } else {
2466 AggregateExpr::count_column(column, alias)
2467 }
2468 }
2469 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
2470 return Err(Error::InvalidArgumentError(
2471 "named COUNT arguments are not supported".into(),
2472 ));
2473 }
2474 FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
2475 return Err(Error::InvalidArgumentError(
2476 "COUNT does not support qualified wildcards".into(),
2477 ));
2478 }
2479 }
2480 }
2481 "sum" | "min" | "max" => {
2482 if is_distinct {
2483 return Err(Error::InvalidArgumentError(
2484 "DISTINCT is not supported for this aggregate".into(),
2485 ));
2486 }
2487 if args_slice.len() != 1 {
2488 return Err(Error::InvalidArgumentError(format!(
2489 "{} accepts exactly one argument",
2490 func_name.to_uppercase()
2491 )));
2492 }
2493 let arg_expr = match &args_slice[0] {
2494 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => arg_expr,
2495 FunctionArg::Unnamed(FunctionArgExpr::Wildcard)
2496 | FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
2497 return Err(Error::InvalidArgumentError(format!(
2498 "{} does not support wildcard arguments",
2499 func_name.to_uppercase()
2500 )));
2501 }
2502 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
2503 return Err(Error::InvalidArgumentError(format!(
2504 "{} arguments must be column references",
2505 func_name.to_uppercase()
2506 )));
2507 }
2508 };
2509
2510 if func_name == "sum" {
2511 if let Some(column) = parse_count_nulls_case(arg_expr)? {
2512 AggregateExpr::count_nulls(column, alias)
2513 } else {
2514 let column = resolve_column_name(arg_expr)?;
2515 AggregateExpr::sum_int64(column, alias)
2516 }
2517 } else {
2518 let column = resolve_column_name(arg_expr)?;
2519 if func_name == "min" {
2520 AggregateExpr::min_int64(column, alias)
2521 } else {
2522 AggregateExpr::max_int64(column, alias)
2523 }
2524 }
2525 }
2526 _ => return Ok(None),
2527 };
2528
2529 specs.push(aggregate);
2530 }
2531
2532 if specs.is_empty() {
2533 return Ok(None);
2534 }
2535 Ok(Some(specs))
2536 }
2537
2538 fn build_projection_list(
2539 &self,
2540 resolver: &IdentifierResolver<'_>,
2541 id_context: IdentifierContext,
2542 projection_items: &[SelectItem],
2543 ) -> SqlResult<Vec<SelectProjection>> {
2544 if projection_items.is_empty() {
2545 return Err(Error::InvalidArgumentError(
2546 "SELECT projection must include at least one column".into(),
2547 ));
2548 }
2549
2550 let mut projections = Vec::with_capacity(projection_items.len());
2551 for (idx, item) in projection_items.iter().enumerate() {
2552 match item {
2553 SelectItem::Wildcard(options) => {
2554 if let Some(exclude) = &options.opt_exclude {
2555 use sqlparser::ast::ExcludeSelectItem;
2556 let exclude_cols = match exclude {
2557 ExcludeSelectItem::Single(ident) => vec![ident.value.clone()],
2558 ExcludeSelectItem::Multiple(idents) => {
2559 idents.iter().map(|id| id.value.clone()).collect()
2560 }
2561 };
2562 projections.push(SelectProjection::AllColumnsExcept {
2563 exclude: exclude_cols,
2564 });
2565 } else {
2566 projections.push(SelectProjection::AllColumns);
2567 }
2568 }
2569 SelectItem::QualifiedWildcard(kind, _) => match kind {
2570 SelectItemQualifiedWildcardKind::ObjectName(name) => {
2571 projections.push(SelectProjection::Column {
2572 name: name.to_string(),
2573 alias: None,
2574 });
2575 }
2576 SelectItemQualifiedWildcardKind::Expr(_) => {
2577 return Err(Error::InvalidArgumentError(
2578 "expression-qualified wildcards are not supported".into(),
2579 ));
2580 }
2581 },
2582 SelectItem::UnnamedExpr(expr) => match expr {
2583 SqlExpr::Identifier(ident) => {
2584 let parts = vec![ident.value.clone()];
2585 let resolution = resolver.resolve(&parts, id_context)?;
2586 if resolution.is_simple() {
2587 projections.push(SelectProjection::Column {
2588 name: resolution.column().to_string(),
2589 alias: None,
2590 });
2591 } else {
2592 let alias = format!("col{}", idx + 1);
2593 projections.push(SelectProjection::Computed {
2594 expr: resolution.into_scalar_expr(),
2595 alias,
2596 });
2597 }
2598 }
2599 SqlExpr::CompoundIdentifier(parts) => {
2600 let name_parts: Vec<String> =
2601 parts.iter().map(|part| part.value.clone()).collect();
2602 let resolution = resolver.resolve(&name_parts, id_context)?;
2603 if resolution.is_simple() {
2604 projections.push(SelectProjection::Column {
2605 name: resolution.column().to_string(),
2606 alias: None,
2607 });
2608 } else {
2609 let alias = format!("col{}", idx + 1);
2610 projections.push(SelectProjection::Computed {
2611 expr: resolution.into_scalar_expr(),
2612 alias,
2613 });
2614 }
2615 }
2616 _ => {
2617 let alias = format!("col{}", idx + 1);
2618 let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2619 projections.push(SelectProjection::Computed {
2620 expr: scalar,
2621 alias,
2622 });
2623 }
2624 },
2625 SelectItem::ExprWithAlias { expr, alias } => match expr {
2626 SqlExpr::Identifier(ident) => {
2627 let parts = vec![ident.value.clone()];
2628 let resolution = resolver.resolve(&parts, id_context)?;
2629 if resolution.is_simple() {
2630 projections.push(SelectProjection::Column {
2631 name: resolution.column().to_string(),
2632 alias: Some(alias.value.clone()),
2633 });
2634 } else {
2635 projections.push(SelectProjection::Computed {
2636 expr: resolution.into_scalar_expr(),
2637 alias: alias.value.clone(),
2638 });
2639 }
2640 }
2641 SqlExpr::CompoundIdentifier(parts) => {
2642 let name_parts: Vec<String> =
2643 parts.iter().map(|part| part.value.clone()).collect();
2644 let resolution = resolver.resolve(&name_parts, id_context)?;
2645 if resolution.is_simple() {
2646 projections.push(SelectProjection::Column {
2647 name: resolution.column().to_string(),
2648 alias: Some(alias.value.clone()),
2649 });
2650 } else {
2651 projections.push(SelectProjection::Computed {
2652 expr: resolution.into_scalar_expr(),
2653 alias: alias.value.clone(),
2654 });
2655 }
2656 }
2657 _ => {
2658 let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2659 projections.push(SelectProjection::Computed {
2660 expr: scalar,
2661 alias: alias.value.clone(),
2662 });
2663 }
2664 },
2665 }
2666 }
2667 Ok(projections)
2668 }
2669
2670 #[allow(clippy::too_many_arguments)] fn handle_start_transaction(
2672 &self,
2673 modes: Vec<TransactionMode>,
2674 begin: bool,
2675 transaction: Option<BeginTransactionKind>,
2676 modifier: Option<TransactionModifier>,
2677 statements: Vec<Statement>,
2678 exception: Option<Vec<ExceptionWhen>>,
2679 has_end_keyword: bool,
2680 ) -> SqlResult<RuntimeStatementResult<P>> {
2681 if !modes.is_empty() {
2682 return Err(Error::InvalidArgumentError(
2683 "transaction modes are not supported".into(),
2684 ));
2685 }
2686 if modifier.is_some() {
2687 return Err(Error::InvalidArgumentError(
2688 "transaction modifiers are not supported".into(),
2689 ));
2690 }
2691 if !statements.is_empty() || exception.is_some() || has_end_keyword {
2692 return Err(Error::InvalidArgumentError(
2693 "BEGIN blocks with inline statements or exceptions are not supported".into(),
2694 ));
2695 }
2696 if let Some(kind) = transaction {
2697 match kind {
2698 BeginTransactionKind::Transaction | BeginTransactionKind::Work => {}
2699 }
2700 }
2701 if !begin {
2702 tracing::warn!("Currently treat `START TRANSACTION` same as `BEGIN`")
2704 }
2705
2706 self.execute_plan_statement(PlanStatement::BeginTransaction)
2707 }
2708
2709 fn handle_commit(
2710 &self,
2711 chain: bool,
2712 end: bool,
2713 modifier: Option<TransactionModifier>,
2714 ) -> SqlResult<RuntimeStatementResult<P>> {
2715 if chain {
2716 return Err(Error::InvalidArgumentError(
2717 "COMMIT AND [NO] CHAIN is not supported".into(),
2718 ));
2719 }
2720 if end {
2721 return Err(Error::InvalidArgumentError(
2722 "END blocks are not supported".into(),
2723 ));
2724 }
2725 if modifier.is_some() {
2726 return Err(Error::InvalidArgumentError(
2727 "transaction modifiers are not supported".into(),
2728 ));
2729 }
2730
2731 self.execute_plan_statement(PlanStatement::CommitTransaction)
2732 }
2733
2734 fn handle_rollback(
2735 &self,
2736 chain: bool,
2737 savepoint: Option<Ident>,
2738 ) -> SqlResult<RuntimeStatementResult<P>> {
2739 if chain {
2740 return Err(Error::InvalidArgumentError(
2741 "ROLLBACK AND [NO] CHAIN is not supported".into(),
2742 ));
2743 }
2744 if savepoint.is_some() {
2745 return Err(Error::InvalidArgumentError(
2746 "ROLLBACK TO SAVEPOINT is not supported".into(),
2747 ));
2748 }
2749
2750 self.execute_plan_statement(PlanStatement::RollbackTransaction)
2751 }
2752
2753 fn handle_set(&self, set_stmt: Set) -> SqlResult<RuntimeStatementResult<P>> {
2754 match set_stmt {
2755 Set::SingleAssignment {
2756 scope,
2757 hivevar,
2758 variable,
2759 values,
2760 } => {
2761 if scope.is_some() || hivevar {
2762 return Err(Error::InvalidArgumentError(
2763 "SET modifiers are not supported".into(),
2764 ));
2765 }
2766
2767 let variable_name_raw = variable.to_string();
2768 let variable_name = variable_name_raw.to_ascii_lowercase();
2769
2770 match variable_name.as_str() {
2771 "default_null_order" => {
2772 if values.len() != 1 {
2773 return Err(Error::InvalidArgumentError(
2774 "SET default_null_order expects exactly one value".into(),
2775 ));
2776 }
2777
2778 let value_expr = &values[0];
2779 let normalized = match value_expr {
2780 SqlExpr::Value(value_with_span) => value_with_span
2781 .value
2782 .clone()
2783 .into_string()
2784 .map(|s| s.to_ascii_lowercase()),
2785 SqlExpr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
2786 _ => None,
2787 };
2788
2789 if !matches!(normalized.as_deref(), Some("nulls_first" | "nulls_last")) {
2790 return Err(Error::InvalidArgumentError(format!(
2791 "unsupported value for SET default_null_order: {value_expr:?}"
2792 )));
2793 }
2794
2795 let use_nulls_first = matches!(normalized.as_deref(), Some("nulls_first"));
2796 self.default_nulls_first
2797 .store(use_nulls_first, AtomicOrdering::Relaxed);
2798
2799 Ok(RuntimeStatementResult::NoOp)
2800 }
2801 "immediate_transaction_mode" => {
2802 if values.len() != 1 {
2803 return Err(Error::InvalidArgumentError(
2804 "SET immediate_transaction_mode expects exactly one value".into(),
2805 ));
2806 }
2807 let normalized = values[0].to_string().to_ascii_lowercase();
2808 let enabled = match normalized.as_str() {
2809 "true" | "on" | "1" => true,
2810 "false" | "off" | "0" => false,
2811 _ => {
2812 return Err(Error::InvalidArgumentError(format!(
2813 "unsupported value for SET immediate_transaction_mode: {}",
2814 values[0]
2815 )));
2816 }
2817 };
2818 if !enabled {
2819 tracing::warn!(
2820 "SET immediate_transaction_mode=false has no effect; continuing with auto mode"
2821 );
2822 }
2823 Ok(RuntimeStatementResult::NoOp)
2824 }
2825 _ => Err(Error::InvalidArgumentError(format!(
2826 "unsupported SET variable: {variable_name_raw}"
2827 ))),
2828 }
2829 }
2830 other => Err(Error::InvalidArgumentError(format!(
2831 "unsupported SQL SET statement: {other:?}",
2832 ))),
2833 }
2834 }
2835
2836 fn handle_pragma(
2837 &self,
2838 name: ObjectName,
2839 value: Option<Value>,
2840 is_eq: bool,
2841 ) -> SqlResult<RuntimeStatementResult<P>> {
2842 let (display, canonical) = canonical_object_name(&name)?;
2843 if value.is_some() || is_eq {
2844 return Err(Error::InvalidArgumentError(format!(
2845 "PRAGMA '{display}' does not accept a value"
2846 )));
2847 }
2848
2849 match canonical.as_str() {
2850 "enable_verification" | "disable_verification" => Ok(RuntimeStatementResult::NoOp),
2851 _ => Err(Error::InvalidArgumentError(format!(
2852 "unsupported PRAGMA '{}'",
2853 display
2854 ))),
2855 }
2856 }
2857}
2858
2859fn canonical_object_name(name: &ObjectName) -> SqlResult<(String, String)> {
2860 if name.0.is_empty() {
2861 return Err(Error::InvalidArgumentError(
2862 "object name must not be empty".into(),
2863 ));
2864 }
2865 let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
2866 for part in &name.0 {
2867 let ident = match part {
2868 ObjectNamePart::Identifier(ident) => ident,
2869 _ => {
2870 return Err(Error::InvalidArgumentError(
2871 "object names using functions are not supported".into(),
2872 ));
2873 }
2874 };
2875 parts.push(ident.value.clone());
2876 }
2877 let display = parts.join(".");
2878 let canonical = display.to_ascii_lowercase();
2879 Ok((display, canonical))
2880}
2881
2882fn parse_schema_qualified_name(name: &ObjectName) -> SqlResult<(Option<String>, String)> {
2891 if name.0.is_empty() {
2892 return Err(Error::InvalidArgumentError(
2893 "object name must not be empty".into(),
2894 ));
2895 }
2896
2897 let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
2898 for part in &name.0 {
2899 let ident = match part {
2900 ObjectNamePart::Identifier(ident) => ident,
2901 _ => {
2902 return Err(Error::InvalidArgumentError(
2903 "object names using functions are not supported".into(),
2904 ));
2905 }
2906 };
2907 parts.push(ident.value.clone());
2908 }
2909
2910 match parts.len() {
2911 1 => Ok((None, parts[0].clone())),
2912 2 => Ok((Some(parts[0].clone()), parts[1].clone())),
2913 _ => Err(Error::InvalidArgumentError(format!(
2914 "table name has too many parts: {}",
2915 name
2916 ))),
2917 }
2918}
2919
2920fn extract_index_column_name(
2934 index_col: &sqlparser::ast::IndexColumn,
2935 context: &str,
2936 allow_sort_options: bool,
2937 allow_compound: bool,
2938) -> SqlResult<String> {
2939 use sqlparser::ast::Expr as SqlExpr;
2940
2941 if index_col.operator_class.is_some() {
2943 return Err(Error::InvalidArgumentError(format!(
2944 "{} operator classes are not supported",
2945 context
2946 )));
2947 }
2948
2949 let order_expr = &index_col.column;
2950
2951 if allow_sort_options {
2953 let ascending = order_expr.options.asc.unwrap_or(true);
2955 let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
2956
2957 if !ascending {
2958 return Err(Error::InvalidArgumentError(format!(
2959 "{} DESC ordering is not supported",
2960 context
2961 )));
2962 }
2963 if nulls_first {
2964 return Err(Error::InvalidArgumentError(format!(
2965 "{} NULLS FIRST ordering is not supported",
2966 context
2967 )));
2968 }
2969 } else {
2970 if order_expr.options.asc.is_some()
2972 || order_expr.options.nulls_first.is_some()
2973 || order_expr.with_fill.is_some()
2974 {
2975 return Err(Error::InvalidArgumentError(format!(
2976 "{} columns must be simple identifiers",
2977 context
2978 )));
2979 }
2980 }
2981
2982 let column_name = match &order_expr.expr {
2984 SqlExpr::Identifier(ident) => ident.value.clone(),
2985 SqlExpr::CompoundIdentifier(parts) => {
2986 if allow_compound {
2987 parts
2989 .last()
2990 .map(|ident| ident.value.clone())
2991 .ok_or_else(|| {
2992 Error::InvalidArgumentError(format!(
2993 "invalid column reference in {}",
2994 context
2995 ))
2996 })?
2997 } else if parts.len() == 1 {
2998 parts[0].value.clone()
3000 } else {
3001 return Err(Error::InvalidArgumentError(format!(
3002 "{} columns must be column identifiers",
3003 context
3004 )));
3005 }
3006 }
3007 other => {
3008 return Err(Error::InvalidArgumentError(format!(
3009 "{} only supports column references, found {:?}",
3010 context, other
3011 )));
3012 }
3013 };
3014
3015 Ok(column_name)
3016}
3017
3018fn validate_create_table_common(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3019 if stmt.clone.is_some() || stmt.like.is_some() {
3020 return Err(Error::InvalidArgumentError(
3021 "CREATE TABLE LIKE/CLONE is not supported".into(),
3022 ));
3023 }
3024 if stmt.or_replace && stmt.if_not_exists {
3025 return Err(Error::InvalidArgumentError(
3026 "CREATE TABLE cannot combine OR REPLACE with IF NOT EXISTS".into(),
3027 ));
3028 }
3029 use sqlparser::ast::TableConstraint;
3030
3031 let mut seen_primary_key = false;
3032 for constraint in &stmt.constraints {
3033 match constraint {
3034 TableConstraint::PrimaryKey { .. } => {
3035 if seen_primary_key {
3036 return Err(Error::InvalidArgumentError(
3037 "multiple PRIMARY KEY constraints are not supported".into(),
3038 ));
3039 }
3040 seen_primary_key = true;
3041 }
3042 TableConstraint::Unique { .. } => {
3043 }
3045 TableConstraint::ForeignKey { .. } => {
3046 }
3048 other => {
3049 return Err(Error::InvalidArgumentError(format!(
3050 "table-level constraint {:?} is not supported",
3051 other
3052 )));
3053 }
3054 }
3055 }
3056 Ok(())
3057}
3058
3059fn validate_check_constraint(
3060 check_expr: &sqlparser::ast::Expr,
3061 table_name: &str,
3062 column_names: &[&str],
3063) -> SqlResult<()> {
3064 use sqlparser::ast::Expr as SqlExpr;
3065
3066 let column_names_lower: HashSet<String> = column_names
3067 .iter()
3068 .map(|name| name.to_ascii_lowercase())
3069 .collect();
3070
3071 let mut stack: Vec<&SqlExpr> = vec![check_expr];
3072
3073 while let Some(expr) = stack.pop() {
3074 match expr {
3075 SqlExpr::Subquery(_) => {
3076 return Err(Error::InvalidArgumentError(
3077 "Subqueries are not allowed in CHECK constraints".into(),
3078 ));
3079 }
3080 SqlExpr::Function(func) => {
3081 let func_name = func.name.to_string().to_uppercase();
3082 if matches!(func_name.as_str(), "SUM" | "AVG" | "COUNT" | "MIN" | "MAX") {
3083 return Err(Error::InvalidArgumentError(
3084 "Aggregate functions are not allowed in CHECK constraints".into(),
3085 ));
3086 }
3087
3088 if let sqlparser::ast::FunctionArguments::List(list) = &func.args {
3089 for arg in &list.args {
3090 if let sqlparser::ast::FunctionArg::Unnamed(
3091 sqlparser::ast::FunctionArgExpr::Expr(expr),
3092 ) = arg
3093 {
3094 stack.push(expr);
3095 }
3096 }
3097 }
3098 }
3099 SqlExpr::Identifier(ident) => {
3100 if !column_names_lower.contains(&ident.value.to_ascii_lowercase()) {
3101 return Err(Error::InvalidArgumentError(format!(
3102 "Column '{}' referenced in CHECK constraint does not exist",
3103 ident.value
3104 )));
3105 }
3106 }
3107 SqlExpr::CompoundIdentifier(idents) => {
3108 if idents.len() == 2 {
3109 let first = idents[0].value.as_str();
3110 let second = &idents[1].value;
3111
3112 if column_names_lower.contains(&first.to_ascii_lowercase()) {
3113 continue;
3114 }
3115
3116 if !first.eq_ignore_ascii_case(table_name) {
3117 return Err(Error::InvalidArgumentError(format!(
3118 "CHECK constraint references column from different table '{}'",
3119 first
3120 )));
3121 }
3122
3123 if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3124 return Err(Error::InvalidArgumentError(format!(
3125 "Column '{}' referenced in CHECK constraint does not exist",
3126 second
3127 )));
3128 }
3129 } else if idents.len() == 3 {
3130 let first = &idents[0].value;
3131 let second = &idents[1].value;
3132 let third = &idents[2].value;
3133
3134 if first.eq_ignore_ascii_case(table_name) {
3135 if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3136 return Err(Error::InvalidArgumentError(format!(
3137 "Column '{}' referenced in CHECK constraint does not exist",
3138 second
3139 )));
3140 }
3141 } else if second.eq_ignore_ascii_case(table_name) {
3142 if !column_names_lower.contains(&third.to_ascii_lowercase()) {
3143 return Err(Error::InvalidArgumentError(format!(
3144 "Column '{}' referenced in CHECK constraint does not exist",
3145 third
3146 )));
3147 }
3148 } else {
3149 return Err(Error::InvalidArgumentError(format!(
3150 "CHECK constraint references column from different table '{}'",
3151 second
3152 )));
3153 }
3154 }
3155 }
3156 SqlExpr::BinaryOp { left, right, .. } => {
3157 stack.push(left);
3158 stack.push(right);
3159 }
3160 SqlExpr::UnaryOp { expr, .. } | SqlExpr::Nested(expr) => {
3161 stack.push(expr);
3162 }
3163 SqlExpr::Value(_) | SqlExpr::TypedString { .. } => {}
3164 _ => {}
3165 }
3166 }
3167
3168 Ok(())
3169}
3170
3171fn validate_create_table_definition(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3172 for column in &stmt.columns {
3173 for ColumnOptionDef { option, .. } in &column.options {
3174 match option {
3175 ColumnOption::Null
3176 | ColumnOption::NotNull
3177 | ColumnOption::Unique { .. }
3178 | ColumnOption::Check(_)
3179 | ColumnOption::ForeignKey { .. } => {}
3180 ColumnOption::Default(_) => {
3181 return Err(Error::InvalidArgumentError(format!(
3182 "DEFAULT values are not supported for column '{}'",
3183 column.name
3184 )));
3185 }
3186 other => {
3187 return Err(Error::InvalidArgumentError(format!(
3188 "unsupported column option {:?} on '{}'",
3189 other, column.name
3190 )));
3191 }
3192 }
3193 }
3194 }
3195 Ok(())
3196}
3197
3198fn validate_create_table_as(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3199 if !stmt.columns.is_empty() {
3200 return Err(Error::InvalidArgumentError(
3201 "CREATE TABLE AS SELECT does not support column definitions yet".into(),
3202 ));
3203 }
3204 Ok(())
3205}
3206
3207fn validate_simple_query(query: &Query) -> SqlResult<()> {
3208 if query.with.is_some() {
3209 return Err(Error::InvalidArgumentError(
3210 "WITH clauses are not supported".into(),
3211 ));
3212 }
3213 if let Some(limit_clause) = &query.limit_clause {
3214 match limit_clause {
3215 LimitClause::LimitOffset {
3216 offset: Some(_), ..
3217 }
3218 | LimitClause::OffsetCommaLimit { .. } => {
3219 return Err(Error::InvalidArgumentError(
3220 "OFFSET clauses are not supported".into(),
3221 ));
3222 }
3223 LimitClause::LimitOffset { limit_by, .. } if !limit_by.is_empty() => {
3224 return Err(Error::InvalidArgumentError(
3225 "LIMIT BY clauses are not supported".into(),
3226 ));
3227 }
3228 _ => {}
3229 }
3230 }
3231 if query.fetch.is_some() {
3232 return Err(Error::InvalidArgumentError(
3233 "FETCH clauses are not supported".into(),
3234 ));
3235 }
3236 Ok(())
3237}
3238
3239fn resolve_column_name(expr: &SqlExpr) -> SqlResult<String> {
3240 match expr {
3241 SqlExpr::Identifier(ident) => Ok(ident.value.clone()),
3242 SqlExpr::CompoundIdentifier(parts) => {
3243 if let Some(last) = parts.last() {
3244 Ok(last.value.clone())
3245 } else {
3246 Err(Error::InvalidArgumentError(
3247 "empty column identifier".into(),
3248 ))
3249 }
3250 }
3251 _ => Err(Error::InvalidArgumentError(
3252 "aggregate arguments must be plain column identifiers".into(),
3253 )),
3254 }
3255}
3256
3257fn validate_projection_alias_qualifiers(
3258 projection_items: &[SelectItem],
3259 alias: &str,
3260) -> SqlResult<()> {
3261 let alias_lower = alias.to_ascii_lowercase();
3262 for item in projection_items {
3263 match item {
3264 SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
3265 if let SqlExpr::CompoundIdentifier(parts) = expr
3266 && parts.len() >= 2
3267 && let Some(first) = parts.first()
3268 && !first.value.eq_ignore_ascii_case(&alias_lower)
3269 {
3270 return Err(Error::InvalidArgumentError(format!(
3271 "Binder Error: table '{}' not found",
3272 first.value
3273 )));
3274 }
3275 }
3276 _ => {}
3277 }
3278 }
3279 Ok(())
3280}
3281
3282#[allow(dead_code)] fn expr_contains_aggregate(expr: &llkv_expr::expr::ScalarExpr<String>) -> bool {
3286 match expr {
3287 llkv_expr::expr::ScalarExpr::Aggregate(_) => true,
3288 llkv_expr::expr::ScalarExpr::Binary { left, right, .. } => {
3289 expr_contains_aggregate(left) || expr_contains_aggregate(right)
3290 }
3291 llkv_expr::expr::ScalarExpr::GetField { base, .. } => expr_contains_aggregate(base),
3292 llkv_expr::expr::ScalarExpr::Column(_) | llkv_expr::expr::ScalarExpr::Literal(_) => false,
3293 }
3294}
3295
3296fn try_parse_aggregate_function(
3297 func: &sqlparser::ast::Function,
3298) -> SqlResult<Option<llkv_expr::expr::AggregateCall<String>>> {
3299 use sqlparser::ast::{FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart};
3300
3301 if func.uses_odbc_syntax {
3302 return Ok(None);
3303 }
3304 if !matches!(func.parameters, FunctionArguments::None) {
3305 return Ok(None);
3306 }
3307 if func.filter.is_some()
3308 || func.null_treatment.is_some()
3309 || func.over.is_some()
3310 || !func.within_group.is_empty()
3311 {
3312 return Ok(None);
3313 }
3314
3315 let func_name = if func.name.0.len() == 1 {
3316 match &func.name.0[0] {
3317 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
3318 _ => return Ok(None),
3319 }
3320 } else {
3321 return Ok(None);
3322 };
3323
3324 let args_slice: &[FunctionArg] = match &func.args {
3325 FunctionArguments::List(list) => {
3326 if list.duplicate_treatment.is_some() || !list.clauses.is_empty() {
3327 return Ok(None);
3328 }
3329 &list.args
3330 }
3331 FunctionArguments::None => &[],
3332 FunctionArguments::Subquery(_) => return Ok(None),
3333 };
3334
3335 let agg_call = match func_name.as_str() {
3336 "count" => {
3337 if args_slice.len() != 1 {
3338 return Err(Error::InvalidArgumentError(
3339 "COUNT accepts exactly one argument".into(),
3340 ));
3341 }
3342 match &args_slice[0] {
3343 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
3344 llkv_expr::expr::AggregateCall::CountStar
3345 }
3346 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
3347 let column = resolve_column_name(arg_expr)?;
3348 llkv_expr::expr::AggregateCall::Count(column)
3349 }
3350 _ => {
3351 return Err(Error::InvalidArgumentError(
3352 "unsupported COUNT argument".into(),
3353 ));
3354 }
3355 }
3356 }
3357 "sum" => {
3358 if args_slice.len() != 1 {
3359 return Err(Error::InvalidArgumentError(
3360 "SUM accepts exactly one argument".into(),
3361 ));
3362 }
3363 let arg_expr = match &args_slice[0] {
3364 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3365 _ => {
3366 return Err(Error::InvalidArgumentError(
3367 "SUM requires a column argument".into(),
3368 ));
3369 }
3370 };
3371
3372 if let Some(column) = parse_count_nulls_case(arg_expr)? {
3374 llkv_expr::expr::AggregateCall::CountNulls(column)
3375 } else {
3376 let column = resolve_column_name(arg_expr)?;
3377 llkv_expr::expr::AggregateCall::Sum(column)
3378 }
3379 }
3380 "min" => {
3381 if args_slice.len() != 1 {
3382 return Err(Error::InvalidArgumentError(
3383 "MIN accepts exactly one argument".into(),
3384 ));
3385 }
3386 let arg_expr = match &args_slice[0] {
3387 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3388 _ => {
3389 return Err(Error::InvalidArgumentError(
3390 "MIN requires a column argument".into(),
3391 ));
3392 }
3393 };
3394 let column = resolve_column_name(arg_expr)?;
3395 llkv_expr::expr::AggregateCall::Min(column)
3396 }
3397 "max" => {
3398 if args_slice.len() != 1 {
3399 return Err(Error::InvalidArgumentError(
3400 "MAX accepts exactly one argument".into(),
3401 ));
3402 }
3403 let arg_expr = match &args_slice[0] {
3404 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3405 _ => {
3406 return Err(Error::InvalidArgumentError(
3407 "MAX requires a column argument".into(),
3408 ));
3409 }
3410 };
3411 let column = resolve_column_name(arg_expr)?;
3412 llkv_expr::expr::AggregateCall::Max(column)
3413 }
3414 _ => return Ok(None),
3415 };
3416
3417 Ok(Some(agg_call))
3418}
3419
3420fn parse_count_nulls_case(expr: &SqlExpr) -> SqlResult<Option<String>> {
3421 let SqlExpr::Case {
3422 operand,
3423 conditions,
3424 else_result,
3425 ..
3426 } = expr
3427 else {
3428 return Ok(None);
3429 };
3430
3431 if operand.is_some() || conditions.len() != 1 {
3432 return Ok(None);
3433 }
3434
3435 let case_when = &conditions[0];
3436 if !is_integer_literal(&case_when.result, 1) {
3437 return Ok(None);
3438 }
3439
3440 let else_expr = match else_result {
3441 Some(expr) => expr.as_ref(),
3442 None => return Ok(None),
3443 };
3444 if !is_integer_literal(else_expr, 0) {
3445 return Ok(None);
3446 }
3447
3448 let inner = match &case_when.condition {
3449 SqlExpr::IsNull(inner) => inner.as_ref(),
3450 _ => return Ok(None),
3451 };
3452
3453 resolve_column_name(inner).map(Some)
3454}
3455
3456fn is_integer_literal(expr: &SqlExpr, expected: i64) -> bool {
3457 match expr {
3458 SqlExpr::Value(ValueWithSpan {
3459 value: Value::Number(text, _),
3460 ..
3461 }) => text.parse::<i64>() == Ok(expected),
3462 _ => false,
3463 }
3464}
3465
3466fn translate_condition_with_context(
3467 resolver: &IdentifierResolver<'_>,
3468 context: IdentifierContext,
3469 expr: &SqlExpr,
3470) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
3471 match expr {
3472 SqlExpr::IsNull(inner) => {
3473 let scalar = translate_scalar_with_context(resolver, context, inner)?;
3474 match scalar {
3475 llkv_expr::expr::ScalarExpr::Column(column) => {
3476 Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3477 field_id: column,
3478 op: llkv_expr::expr::Operator::IsNull,
3479 }))
3480 }
3481 _ => Err(Error::InvalidArgumentError(
3482 "IS NULL predicates currently support column references only".into(),
3483 )),
3484 }
3485 }
3486 SqlExpr::IsNotNull(inner) => {
3487 let scalar = translate_scalar_with_context(resolver, context, inner)?;
3488 match scalar {
3489 llkv_expr::expr::ScalarExpr::Column(column) => {
3490 Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3491 field_id: column,
3492 op: llkv_expr::expr::Operator::IsNotNull,
3493 }))
3494 }
3495 _ => Err(Error::InvalidArgumentError(
3496 "IS NOT NULL predicates currently support column references only".into(),
3497 )),
3498 }
3499 }
3500 SqlExpr::BinaryOp { left, op, right } => match op {
3501 BinaryOperator::And => Ok(llkv_expr::expr::Expr::And(vec![
3502 translate_condition_with_context(resolver, context, left)?,
3503 translate_condition_with_context(resolver, context, right)?,
3504 ])),
3505 BinaryOperator::Or => Ok(llkv_expr::expr::Expr::Or(vec![
3506 translate_condition_with_context(resolver, context, left)?,
3507 translate_condition_with_context(resolver, context, right)?,
3508 ])),
3509 BinaryOperator::Eq
3510 | BinaryOperator::NotEq
3511 | BinaryOperator::Lt
3512 | BinaryOperator::LtEq
3513 | BinaryOperator::Gt
3514 | BinaryOperator::GtEq => {
3515 translate_comparison_with_context(resolver, context, left, op.clone(), right)
3516 }
3517 other => Err(Error::InvalidArgumentError(format!(
3518 "unsupported binary operator in WHERE clause: {other:?}"
3519 ))),
3520 },
3521 SqlExpr::UnaryOp {
3522 op: UnaryOperator::Not,
3523 expr,
3524 } => Ok(llkv_expr::expr::Expr::not(
3525 translate_condition_with_context(resolver, context, expr)?,
3526 )),
3527 SqlExpr::Nested(inner) => translate_condition_with_context(resolver, context, inner),
3528 other => Err(Error::InvalidArgumentError(format!(
3529 "unsupported WHERE clause: {other:?}"
3530 ))),
3531 }
3532}
3533
3534fn translate_comparison_with_context(
3535 resolver: &IdentifierResolver<'_>,
3536 context: IdentifierContext,
3537 left: &SqlExpr,
3538 op: BinaryOperator,
3539 right: &SqlExpr,
3540) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
3541 let left_scalar = translate_scalar_with_context(resolver, context, left)?;
3542 let right_scalar = translate_scalar_with_context(resolver, context, right)?;
3543 let compare_op = match op {
3544 BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
3545 BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
3546 BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
3547 BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
3548 BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
3549 BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
3550 other => {
3551 return Err(Error::InvalidArgumentError(format!(
3552 "unsupported comparison operator: {other:?}"
3553 )));
3554 }
3555 };
3556
3557 if let (
3558 llkv_expr::expr::ScalarExpr::Column(column),
3559 llkv_expr::expr::ScalarExpr::Literal(literal),
3560 ) = (&left_scalar, &right_scalar)
3561 && let Some(op) = compare_op_to_filter_operator(compare_op, literal)
3562 {
3563 return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3564 field_id: column.clone(),
3565 op,
3566 }));
3567 }
3568
3569 if let (
3570 llkv_expr::expr::ScalarExpr::Literal(literal),
3571 llkv_expr::expr::ScalarExpr::Column(column),
3572 ) = (&left_scalar, &right_scalar)
3573 && let Some(flipped) = flip_compare_op(compare_op)
3574 && let Some(op) = compare_op_to_filter_operator(flipped, literal)
3575 {
3576 return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3577 field_id: column.clone(),
3578 op,
3579 }));
3580 }
3581
3582 Ok(llkv_expr::expr::Expr::Compare {
3583 left: left_scalar,
3584 op: compare_op,
3585 right: right_scalar,
3586 })
3587}
3588
3589fn compare_op_to_filter_operator(
3590 op: llkv_expr::expr::CompareOp,
3591 literal: &Literal,
3592) -> Option<llkv_expr::expr::Operator<'static>> {
3593 let lit = literal.clone();
3594 match op {
3595 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::Operator::Equals(lit)),
3596 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::Operator::LessThan(lit)),
3597 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::Operator::LessThanOrEquals(lit)),
3598 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::Operator::GreaterThan(lit)),
3599 llkv_expr::expr::CompareOp::GtEq => {
3600 Some(llkv_expr::expr::Operator::GreaterThanOrEquals(lit))
3601 }
3602 llkv_expr::expr::CompareOp::NotEq => None,
3603 }
3604}
3605
3606fn flip_compare_op(op: llkv_expr::expr::CompareOp) -> Option<llkv_expr::expr::CompareOp> {
3607 match op {
3608 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::CompareOp::Eq),
3609 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::CompareOp::Gt),
3610 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::CompareOp::GtEq),
3611 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::CompareOp::Lt),
3612 llkv_expr::expr::CompareOp::GtEq => Some(llkv_expr::expr::CompareOp::LtEq),
3613 llkv_expr::expr::CompareOp::NotEq => None,
3614 }
3615}
3616fn translate_scalar_with_context(
3619 resolver: &IdentifierResolver<'_>,
3620 context: IdentifierContext,
3621 expr: &SqlExpr,
3622) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3623 match expr {
3624 SqlExpr::Identifier(ident) => {
3625 let parts = vec![ident.value.clone()];
3626 let resolution = resolver.resolve(&parts, context)?;
3627 Ok(resolution.into_scalar_expr())
3628 }
3629 SqlExpr::CompoundIdentifier(idents) => {
3630 if idents.is_empty() {
3631 return Err(Error::InvalidArgumentError(
3632 "invalid compound identifier".into(),
3633 ));
3634 }
3635
3636 let parts: Vec<String> = idents.iter().map(|ident| ident.value.clone()).collect();
3637 let resolution = resolver.resolve(&parts, context)?;
3638 Ok(resolution.into_scalar_expr())
3639 }
3640 _ => translate_scalar(expr),
3641 }
3642}
3643
3644fn translate_scalar(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3645 match expr {
3646 SqlExpr::Identifier(ident) => Ok(llkv_expr::expr::ScalarExpr::column(ident.value.clone())),
3647 SqlExpr::CompoundIdentifier(idents) => {
3648 if idents.is_empty() {
3649 return Err(Error::InvalidArgumentError(
3650 "invalid compound identifier".into(),
3651 ));
3652 }
3653
3654 let column_name = idents[0].value.clone();
3656 let mut result = llkv_expr::expr::ScalarExpr::column(column_name);
3657
3658 for part in &idents[1..] {
3659 let field_name = part.value.clone();
3660 result = llkv_expr::expr::ScalarExpr::get_field(result, field_name);
3661 }
3662
3663 Ok(result)
3664 }
3665 SqlExpr::Value(value) => literal_from_value(value),
3666 SqlExpr::BinaryOp { left, op, right } => {
3667 let left_expr = translate_scalar(left)?;
3668 let right_expr = translate_scalar(right)?;
3669 let op = match op {
3670 BinaryOperator::Plus => llkv_expr::expr::BinaryOp::Add,
3671 BinaryOperator::Minus => llkv_expr::expr::BinaryOp::Subtract,
3672 BinaryOperator::Multiply => llkv_expr::expr::BinaryOp::Multiply,
3673 BinaryOperator::Divide => llkv_expr::expr::BinaryOp::Divide,
3674 BinaryOperator::Modulo => llkv_expr::expr::BinaryOp::Modulo,
3675 other => {
3676 return Err(Error::InvalidArgumentError(format!(
3677 "unsupported scalar binary operator: {other:?}"
3678 )));
3679 }
3680 };
3681 Ok(llkv_expr::expr::ScalarExpr::binary(
3682 left_expr, op, right_expr,
3683 ))
3684 }
3685 SqlExpr::UnaryOp {
3686 op: UnaryOperator::Minus,
3687 expr,
3688 } => match translate_scalar(expr)? {
3689 llkv_expr::expr::ScalarExpr::Literal(lit) => match lit {
3690 Literal::Integer(v) => {
3691 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(-v)))
3692 }
3693 Literal::Float(v) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(-v))),
3694 Literal::Boolean(_) => Err(Error::InvalidArgumentError(
3695 "cannot negate boolean literal".into(),
3696 )),
3697 Literal::String(_) => Err(Error::InvalidArgumentError(
3698 "cannot negate string literal".into(),
3699 )),
3700 Literal::Struct(_) => Err(Error::InvalidArgumentError(
3701 "cannot negate struct literal".into(),
3702 )),
3703 Literal::Null => Err(Error::InvalidArgumentError(
3704 "cannot negate null literal".into(),
3705 )),
3706 },
3707 _ => Err(Error::InvalidArgumentError(
3708 "cannot negate non-literal expression".into(),
3709 )),
3710 },
3711 SqlExpr::UnaryOp {
3712 op: UnaryOperator::Plus,
3713 expr,
3714 } => translate_scalar(expr),
3715 SqlExpr::Nested(inner) => translate_scalar(inner),
3716 SqlExpr::Function(func) => {
3717 if let Some(agg_call) = try_parse_aggregate_function(func)? {
3719 Ok(llkv_expr::expr::ScalarExpr::aggregate(agg_call))
3720 } else {
3721 Err(Error::InvalidArgumentError(format!(
3722 "unsupported function in scalar expression: {:?}",
3723 func.name
3724 )))
3725 }
3726 }
3727 SqlExpr::Dictionary(fields) => {
3728 let mut struct_fields = Vec::new();
3730 for entry in fields {
3731 let key = entry.key.value.clone(); let value_expr = translate_scalar(&entry.value)?;
3734 match value_expr {
3736 llkv_expr::expr::ScalarExpr::Literal(lit) => {
3737 struct_fields.push((key, Box::new(lit)));
3738 }
3739 _ => {
3740 return Err(Error::InvalidArgumentError(
3741 "Dictionary values must be literals".to_string(),
3742 ));
3743 }
3744 }
3745 }
3746 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Struct(
3747 struct_fields,
3748 )))
3749 }
3750 other => Err(Error::InvalidArgumentError(format!(
3751 "unsupported scalar expression: {other:?}"
3752 ))),
3753 }
3754}
3755
3756fn literal_from_value(value: &ValueWithSpan) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3757 match &value.value {
3758 Value::Number(text, _) => {
3759 if text.contains(['.', 'e', 'E']) {
3760 let parsed = text.parse::<f64>().map_err(|err| {
3761 Error::InvalidArgumentError(format!("invalid float literal: {err}"))
3762 })?;
3763 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(parsed)))
3764 } else {
3765 let parsed = text.parse::<i128>().map_err(|err| {
3766 Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
3767 })?;
3768 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(
3769 parsed,
3770 )))
3771 }
3772 }
3773 Value::Boolean(value) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Boolean(
3774 *value,
3775 ))),
3776 Value::Null => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Null)),
3777 other => {
3778 if let Some(text) = other.clone().into_string() {
3779 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::String(text)))
3780 } else {
3781 Err(Error::InvalidArgumentError(format!(
3782 "unsupported literal: {other:?}"
3783 )))
3784 }
3785 }
3786 }
3787}
3788
3789fn resolve_assignment_column_name(target: &AssignmentTarget) -> SqlResult<String> {
3790 match target {
3791 AssignmentTarget::ColumnName(name) => {
3792 if name.0.len() != 1 {
3793 return Err(Error::InvalidArgumentError(
3794 "qualified column names in UPDATE assignments are not supported yet".into(),
3795 ));
3796 }
3797 match &name.0[0] {
3798 ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
3799 other => Err(Error::InvalidArgumentError(format!(
3800 "unsupported column reference in UPDATE assignment: {other:?}"
3801 ))),
3802 }
3803 }
3804 AssignmentTarget::Tuple(_) => Err(Error::InvalidArgumentError(
3805 "tuple assignments are not supported yet".into(),
3806 )),
3807 }
3808}
3809
3810fn arrow_type_from_sql(data_type: &SqlDataType) -> SqlResult<arrow::datatypes::DataType> {
3811 use arrow::datatypes::DataType;
3812 match data_type {
3813 SqlDataType::Int(_)
3814 | SqlDataType::Integer(_)
3815 | SqlDataType::BigInt(_)
3816 | SqlDataType::SmallInt(_)
3817 | SqlDataType::TinyInt(_) => Ok(DataType::Int64),
3818 SqlDataType::Float(_)
3819 | SqlDataType::Real
3820 | SqlDataType::Double(_)
3821 | SqlDataType::DoublePrecision => Ok(DataType::Float64),
3822 SqlDataType::Text
3823 | SqlDataType::String(_)
3824 | SqlDataType::Varchar(_)
3825 | SqlDataType::Char(_)
3826 | SqlDataType::Uuid => Ok(DataType::Utf8),
3827 SqlDataType::Date => Ok(DataType::Date32),
3828 SqlDataType::Decimal(_) | SqlDataType::Numeric(_) => Ok(DataType::Float64),
3829 SqlDataType::Boolean => Ok(DataType::Boolean),
3830 SqlDataType::Custom(name, args) => {
3831 if name.0.len() == 1
3832 && let ObjectNamePart::Identifier(ident) = &name.0[0]
3833 && ident.value.eq_ignore_ascii_case("row")
3834 {
3835 return row_type_to_arrow(data_type, args);
3836 }
3837 Err(Error::InvalidArgumentError(format!(
3838 "unsupported SQL data type: {data_type:?}"
3839 )))
3840 }
3841 other => Err(Error::InvalidArgumentError(format!(
3842 "unsupported SQL data type: {other:?}"
3843 ))),
3844 }
3845}
3846
3847fn row_type_to_arrow(
3848 data_type: &SqlDataType,
3849 tokens: &[String],
3850) -> SqlResult<arrow::datatypes::DataType> {
3851 use arrow::datatypes::{DataType, Field, FieldRef, Fields};
3852
3853 let row_str = data_type.to_string();
3854 if tokens.is_empty() {
3855 return Err(Error::InvalidArgumentError(
3856 "ROW type must define at least one field".into(),
3857 ));
3858 }
3859
3860 let dialect = GenericDialect {};
3861 let field_definitions = resolve_row_field_types(tokens, &dialect).map_err(|err| {
3862 Error::InvalidArgumentError(format!("unable to parse ROW type '{row_str}': {err}"))
3863 })?;
3864
3865 let mut fields: Vec<FieldRef> = Vec::with_capacity(field_definitions.len());
3866 for (field_name, field_type) in field_definitions {
3867 let arrow_field_type = arrow_type_from_sql(&field_type)?;
3868 fields.push(Arc::new(Field::new(field_name, arrow_field_type, true)));
3869 }
3870
3871 let struct_fields: Fields = fields.into();
3872 Ok(DataType::Struct(struct_fields))
3873}
3874
3875fn resolve_row_field_types(
3876 tokens: &[String],
3877 dialect: &GenericDialect,
3878) -> SqlResult<Vec<(String, SqlDataType)>> {
3879 if tokens.is_empty() {
3880 return Err(Error::InvalidArgumentError(
3881 "ROW type must define at least one field".into(),
3882 ));
3883 }
3884
3885 let mut start = 0;
3886 let mut end = tokens.len();
3887 if tokens[start] == "(" {
3888 if end == 0 || tokens[end - 1] != ")" {
3889 return Err(Error::InvalidArgumentError(
3890 "ROW type is missing closing ')'".into(),
3891 ));
3892 }
3893 start += 1;
3894 end -= 1;
3895 } else if tokens[end - 1] == ")" {
3896 return Err(Error::InvalidArgumentError(
3897 "ROW type contains unmatched ')'".into(),
3898 ));
3899 }
3900
3901 let slice = &tokens[start..end];
3902 if slice.is_empty() {
3903 return Err(Error::InvalidArgumentError(
3904 "ROW type did not provide any field definitions".into(),
3905 ));
3906 }
3907
3908 let mut fields = Vec::new();
3909 let mut index = 0;
3910
3911 while index < slice.len() {
3912 if slice[index] == "," {
3913 index += 1;
3914 continue;
3915 }
3916
3917 let field_name = normalize_row_field_name(&slice[index])?;
3918 index += 1;
3919
3920 if index >= slice.len() {
3921 return Err(Error::InvalidArgumentError(format!(
3922 "ROW field '{field_name}' is missing a type specification"
3923 )));
3924 }
3925
3926 let mut last_success: Option<(usize, SqlDataType)> = None;
3927 let mut type_end = index;
3928
3929 while type_end <= slice.len() {
3930 let candidate = slice[index..type_end].join(" ");
3931 if candidate.trim().is_empty() {
3932 type_end += 1;
3933 continue;
3934 }
3935
3936 if let Ok(parsed_type) = parse_sql_data_type(&candidate, dialect) {
3937 last_success = Some((type_end, parsed_type));
3938 }
3939
3940 if type_end == slice.len() {
3941 break;
3942 }
3943
3944 if slice[type_end] == "," && last_success.is_some() {
3945 break;
3946 }
3947
3948 type_end += 1;
3949 }
3950
3951 let Some((next_index, data_type)) = last_success else {
3952 return Err(Error::InvalidArgumentError(format!(
3953 "failed to parse ROW field type for '{field_name}'"
3954 )));
3955 };
3956
3957 fields.push((field_name, data_type));
3958 index = next_index;
3959
3960 if index < slice.len() && slice[index] == "," {
3961 index += 1;
3962 }
3963 }
3964
3965 if fields.is_empty() {
3966 return Err(Error::InvalidArgumentError(
3967 "ROW type did not provide any field definitions".into(),
3968 ));
3969 }
3970
3971 Ok(fields)
3972}
3973
3974fn normalize_row_field_name(raw: &str) -> SqlResult<String> {
3975 let trimmed = raw.trim();
3976 if trimmed.is_empty() {
3977 return Err(Error::InvalidArgumentError(
3978 "ROW field name must not be empty".into(),
3979 ));
3980 }
3981
3982 if let Some(stripped) = trimmed.strip_prefix('"') {
3983 let without_end = stripped.strip_suffix('"').ok_or_else(|| {
3984 Error::InvalidArgumentError(format!("unterminated quoted ROW field name: {trimmed}"))
3985 })?;
3986 let name = without_end.replace("\"\"", "\"");
3987 return Ok(name);
3988 }
3989
3990 Ok(trimmed.to_string())
3991}
3992
3993fn parse_sql_data_type(type_str: &str, dialect: &GenericDialect) -> SqlResult<SqlDataType> {
3994 let trimmed = type_str.trim();
3995 let sql = format!("CREATE TABLE __row(__field {trimmed});");
3996 let statements = Parser::parse_sql(dialect, &sql).map_err(|err| {
3997 Error::InvalidArgumentError(format!("failed to parse ROW field type '{trimmed}': {err}"))
3998 })?;
3999
4000 let stmt = statements.into_iter().next().ok_or_else(|| {
4001 Error::InvalidArgumentError(format!(
4002 "ROW field type '{trimmed}' did not produce a statement"
4003 ))
4004 })?;
4005
4006 match stmt {
4007 Statement::CreateTable(table) => table
4008 .columns
4009 .first()
4010 .map(|col| col.data_type.clone())
4011 .ok_or_else(|| {
4012 Error::InvalidArgumentError(format!(
4013 "ROW field type '{trimmed}' missing column definition"
4014 ))
4015 }),
4016 other => Err(Error::InvalidArgumentError(format!(
4017 "unexpected statement while parsing ROW field type: {other:?}"
4018 ))),
4019 }
4020}
4021
4022fn extract_constant_select_rows(select: &Select) -> SqlResult<Option<Vec<Vec<PlanValue>>>> {
4023 if !select.from.is_empty() {
4024 return Ok(None);
4025 }
4026
4027 if select.selection.is_some()
4028 || select.having.is_some()
4029 || !select.named_window.is_empty()
4030 || select.qualify.is_some()
4031 || select.distinct.is_some()
4032 || select.top.is_some()
4033 || select.into.is_some()
4034 || select.prewhere.is_some()
4035 || !select.lateral_views.is_empty()
4036 || select.value_table_mode.is_some()
4037 || !group_by_is_empty(&select.group_by)
4038 {
4039 return Err(Error::InvalidArgumentError(
4040 "constant SELECT statements do not support advanced clauses".into(),
4041 ));
4042 }
4043
4044 if select.projection.is_empty() {
4045 return Err(Error::InvalidArgumentError(
4046 "constant SELECT requires at least one projection".into(),
4047 ));
4048 }
4049
4050 let mut row: Vec<PlanValue> = Vec::with_capacity(select.projection.len());
4051 for item in &select.projection {
4052 let expr = match item {
4053 SelectItem::UnnamedExpr(expr) => expr,
4054 SelectItem::ExprWithAlias { expr, .. } => expr,
4055 other => {
4056 return Err(Error::InvalidArgumentError(format!(
4057 "unsupported projection in constant SELECT: {other:?}"
4058 )));
4059 }
4060 };
4061
4062 let value = SqlValue::try_from_expr(expr)?;
4063 row.push(PlanValue::from(value));
4064 }
4065
4066 Ok(Some(vec![row]))
4067}
4068
4069fn extract_single_table(from: &[TableWithJoins]) -> SqlResult<(String, String)> {
4070 if from.len() != 1 {
4071 return Err(Error::InvalidArgumentError(
4072 "queries over multiple tables are not supported yet".into(),
4073 ));
4074 }
4075 let item = &from[0];
4076 if !item.joins.is_empty() {
4077 return Err(Error::InvalidArgumentError(
4078 "JOIN clauses are not supported yet".into(),
4079 ));
4080 }
4081 match &item.relation {
4082 TableFactor::Table { name, .. } => canonical_object_name(name),
4083 _ => Err(Error::InvalidArgumentError(
4084 "queries require a plain table name".into(),
4085 )),
4086 }
4087}
4088
4089fn extract_tables(from: &[TableWithJoins]) -> SqlResult<Vec<llkv_plan::TableRef>> {
4091 let mut tables = Vec::new();
4092
4093 for item in from {
4094 if !item.joins.is_empty() {
4096 return Err(Error::InvalidArgumentError(
4097 "JOIN clauses are not supported yet".into(),
4098 ));
4099 }
4100
4101 match &item.relation {
4103 TableFactor::Table { name, .. } => {
4104 let (schema_opt, table) = parse_schema_qualified_name(name)?;
4105 let schema = schema_opt.unwrap_or_default();
4106 tables.push(llkv_plan::TableRef::new(schema, table));
4107 }
4108 _ => {
4109 return Err(Error::InvalidArgumentError(
4110 "queries require a plain table name".into(),
4111 ));
4112 }
4113 }
4114 }
4115
4116 Ok(tables)
4117}
4118
4119fn group_by_is_empty(expr: &GroupByExpr) -> bool {
4120 matches!(
4121 expr,
4122 GroupByExpr::Expressions(exprs, modifiers)
4123 if exprs.is_empty() && modifiers.is_empty()
4124 )
4125}
4126
4127#[cfg(test)]
4128mod tests {
4129 use super::*;
4130 use arrow::array::{Array, Int64Array, StringArray};
4131 use arrow::record_batch::RecordBatch;
4132 use llkv_storage::pager::MemPager;
4133
4134 fn extract_string_options(batches: &[RecordBatch]) -> Vec<Option<String>> {
4135 let mut values: Vec<Option<String>> = Vec::new();
4136 for batch in batches {
4137 let column = batch
4138 .column(0)
4139 .as_any()
4140 .downcast_ref::<StringArray>()
4141 .expect("string column");
4142 for idx in 0..column.len() {
4143 if column.is_null(idx) {
4144 values.push(None);
4145 } else {
4146 values.push(Some(column.value(idx).to_string()));
4147 }
4148 }
4149 }
4150 values
4151 }
4152
4153 #[test]
4154 fn create_insert_select_roundtrip() {
4155 let pager = Arc::new(MemPager::default());
4156 let engine = SqlEngine::new(pager);
4157
4158 let result = engine
4159 .execute("CREATE TABLE people (id INT NOT NULL, name TEXT NOT NULL)")
4160 .expect("create table");
4161 assert!(matches!(
4162 result[0],
4163 RuntimeStatementResult::CreateTable { .. }
4164 ));
4165
4166 let result = engine
4167 .execute("INSERT INTO people (id, name) VALUES (1, 'alice'), (2, 'bob')")
4168 .expect("insert rows");
4169 assert!(matches!(
4170 result[0],
4171 RuntimeStatementResult::Insert {
4172 rows_inserted: 2,
4173 ..
4174 }
4175 ));
4176
4177 let mut result = engine
4178 .execute("SELECT name FROM people WHERE id = 2")
4179 .expect("select rows");
4180 let select_result = result.remove(0);
4181 let batches = match select_result {
4182 RuntimeStatementResult::Select { execution, .. } => {
4183 execution.collect().expect("collect batches")
4184 }
4185 _ => panic!("expected select result"),
4186 };
4187 assert_eq!(batches.len(), 1);
4188 let column = batches[0]
4189 .column(0)
4190 .as_any()
4191 .downcast_ref::<StringArray>()
4192 .expect("string column");
4193 assert_eq!(column.len(), 1);
4194 assert_eq!(column.value(0), "bob");
4195 }
4196
4197 #[test]
4198 fn insert_select_constant_including_null() {
4199 let pager = Arc::new(MemPager::default());
4200 let engine = SqlEngine::new(pager);
4201
4202 engine
4203 .execute("CREATE TABLE integers(i INTEGER)")
4204 .expect("create table");
4205
4206 let result = engine
4207 .execute("INSERT INTO integers SELECT 42")
4208 .expect("insert literal");
4209 assert!(matches!(
4210 result[0],
4211 RuntimeStatementResult::Insert {
4212 rows_inserted: 1,
4213 ..
4214 }
4215 ));
4216
4217 let result = engine
4218 .execute("INSERT INTO integers SELECT CAST(NULL AS VARCHAR)")
4219 .expect("insert null literal");
4220 assert!(matches!(
4221 result[0],
4222 RuntimeStatementResult::Insert {
4223 rows_inserted: 1,
4224 ..
4225 }
4226 ));
4227
4228 let mut result = engine
4229 .execute("SELECT * FROM integers")
4230 .expect("select rows");
4231 let select_result = result.remove(0);
4232 let batches = match select_result {
4233 RuntimeStatementResult::Select { execution, .. } => {
4234 execution.collect().expect("collect batches")
4235 }
4236 _ => panic!("expected select result"),
4237 };
4238
4239 let mut values: Vec<Option<i64>> = Vec::new();
4240 for batch in &batches {
4241 let column = batch
4242 .column(0)
4243 .as_any()
4244 .downcast_ref::<Int64Array>()
4245 .expect("int column");
4246 for idx in 0..column.len() {
4247 if column.is_null(idx) {
4248 values.push(None);
4249 } else {
4250 values.push(Some(column.value(idx)));
4251 }
4252 }
4253 }
4254
4255 assert_eq!(values, vec![Some(42), None]);
4256 }
4257
4258 #[test]
4259 fn update_with_where_clause_filters_rows() {
4260 let pager = Arc::new(MemPager::default());
4261 let engine = SqlEngine::new(pager);
4262
4263 engine
4264 .execute("SET default_null_order='nulls_first'")
4265 .expect("set default null order");
4266
4267 engine
4268 .execute("CREATE TABLE strings(a VARCHAR)")
4269 .expect("create table");
4270
4271 engine
4272 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
4273 .expect("insert seed rows");
4274
4275 let result = engine
4276 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
4277 .expect("update rows");
4278 assert!(matches!(
4279 result[0],
4280 RuntimeStatementResult::Update {
4281 rows_updated: 1,
4282 ..
4283 }
4284 ));
4285
4286 let mut result = engine
4287 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4288 .expect("select rows");
4289 let select_result = result.remove(0);
4290 let batches = match select_result {
4291 RuntimeStatementResult::Select { execution, .. } => {
4292 execution.collect().expect("collect batches")
4293 }
4294 _ => panic!("expected select result"),
4295 };
4296
4297 let mut values: Vec<Option<String>> = Vec::new();
4298 for batch in &batches {
4299 let column = batch
4300 .column(0)
4301 .as_any()
4302 .downcast_ref::<StringArray>()
4303 .expect("string column");
4304 for idx in 0..column.len() {
4305 if column.is_null(idx) {
4306 values.push(None);
4307 } else {
4308 values.push(Some(column.value(idx).to_string()));
4309 }
4310 }
4311 }
4312
4313 values.sort_by(|a, b| match (a, b) {
4314 (None, None) => std::cmp::Ordering::Equal,
4315 (None, Some(_)) => std::cmp::Ordering::Less,
4316 (Some(_), None) => std::cmp::Ordering::Greater,
4317 (Some(av), Some(bv)) => {
4318 let a_val = av.parse::<i64>().unwrap_or_default();
4319 let b_val = bv.parse::<i64>().unwrap_or_default();
4320 a_val.cmp(&b_val)
4321 }
4322 });
4323
4324 assert_eq!(
4325 values,
4326 vec![None, Some("4".to_string()), Some("13".to_string())]
4327 );
4328 }
4329
4330 #[test]
4331 fn order_by_honors_configured_default_null_order() {
4332 let pager = Arc::new(MemPager::default());
4333 let engine = SqlEngine::new(pager);
4334
4335 engine
4336 .execute("CREATE TABLE strings(a VARCHAR)")
4337 .expect("create table");
4338 engine
4339 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
4340 .expect("insert values");
4341 engine
4342 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
4343 .expect("update value");
4344
4345 let mut result = engine
4346 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4347 .expect("select rows");
4348 let select_result = result.remove(0);
4349 let batches = match select_result {
4350 RuntimeStatementResult::Select { execution, .. } => {
4351 execution.collect().expect("collect batches")
4352 }
4353 _ => panic!("expected select result"),
4354 };
4355
4356 let values = extract_string_options(&batches);
4357 assert_eq!(
4358 values,
4359 vec![Some("4".to_string()), Some("13".to_string()), None]
4360 );
4361
4362 assert!(!engine.default_nulls_first_for_tests());
4363
4364 engine
4365 .execute("SET default_null_order='nulls_first'")
4366 .expect("set default null order");
4367
4368 assert!(engine.default_nulls_first_for_tests());
4369
4370 let mut result = engine
4371 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4372 .expect("select rows");
4373 let select_result = result.remove(0);
4374 let batches = match select_result {
4375 RuntimeStatementResult::Select { execution, .. } => {
4376 execution.collect().expect("collect batches")
4377 }
4378 _ => panic!("expected select result"),
4379 };
4380
4381 let values = extract_string_options(&batches);
4382 assert_eq!(
4383 values,
4384 vec![None, Some("4".to_string()), Some("13".to_string())]
4385 );
4386 }
4387
4388 #[test]
4389 fn arrow_type_from_row_returns_struct_fields() {
4390 let dialect = GenericDialect {};
4391 let statements = Parser::parse_sql(
4392 &dialect,
4393 "CREATE TABLE row_types(payload ROW(a INTEGER, b VARCHAR));",
4394 )
4395 .expect("parse ROW type definition");
4396
4397 let data_type = match &statements[0] {
4398 Statement::CreateTable(stmt) => stmt.columns[0].data_type.clone(),
4399 other => panic!("unexpected statement: {other:?}"),
4400 };
4401
4402 let arrow_type = arrow_type_from_sql(&data_type).expect("convert ROW type");
4403 match arrow_type {
4404 arrow::datatypes::DataType::Struct(fields) => {
4405 assert_eq!(fields.len(), 2, "unexpected field count");
4406 assert_eq!(fields[0].name(), "a");
4407 assert_eq!(fields[1].name(), "b");
4408 assert_eq!(fields[0].data_type(), &arrow::datatypes::DataType::Int64);
4409 assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
4410 }
4411 other => panic!("expected struct type, got {other:?}"),
4412 }
4413 }
4414}