1use std::collections::{HashMap, HashSet};
2use std::sync::{
3 Arc, OnceLock,
4 atomic::{AtomicBool, Ordering as AtomicOrdering},
5};
6
7use crate::SqlResult;
8use crate::SqlValue;
9use arrow::record_batch::RecordBatch;
10
11use llkv_executor::SelectExecution;
12use llkv_expr::literal::Literal;
13use llkv_plan::validation::{
14 ensure_known_columns_case_insensitive, ensure_non_empty, ensure_unique_case_insensitive,
15};
16use llkv_result::Error;
17use llkv_runtime::TEMPORARY_NAMESPACE_ID;
18use llkv_runtime::{
19 AggregateExpr, AssignmentValue, ColumnAssignment, CreateIndexPlan, CreateTablePlan,
20 CreateTableSource, DeletePlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertPlan,
21 InsertSource, MultiColumnUniqueSpec, OrderByPlan, OrderSortType, OrderTarget, PlanColumnSpec,
22 PlanStatement, PlanValue, RenameTablePlan, RuntimeContext, RuntimeEngine, RuntimeSession,
23 RuntimeStatementResult, SelectPlan, SelectProjection, UpdatePlan, extract_rows_from_range,
24};
25use llkv_storage::pager::Pager;
26use llkv_table::CatalogDdl;
27use llkv_table::catalog::{IdentifierContext, IdentifierResolver};
28use regex::Regex;
29use simd_r_drive_entry_handle::EntryHandle;
30use sqlparser::ast::{
31 AlterColumnOperation, AlterTableOperation, Assignment, AssignmentTarget, BeginTransactionKind,
32 BinaryOperator, ColumnOption, ColumnOptionDef, ConstraintCharacteristics,
33 DataType as SqlDataType, Delete, ExceptionWhen, Expr as SqlExpr, FromTable, FunctionArg,
34 FunctionArgExpr, FunctionArguments, GroupByExpr, Ident, LimitClause, NullsDistinctOption,
35 ObjectName, ObjectNamePart, ObjectType, OrderBy, OrderByKind, Query, ReferentialAction,
36 SchemaName, Select, SelectItem, SelectItemQualifiedWildcardKind, Set, SetExpr, SqlOption,
37 Statement, TableConstraint, TableFactor, TableObject, TableWithJoins, TransactionMode,
38 TransactionModifier, UnaryOperator, UpdateTableFromKind, Value, ValueWithSpan,
39};
40use sqlparser::dialect::GenericDialect;
41use sqlparser::parser::Parser;
42
43pub struct SqlEngine<P>
89where
90 P: Pager<Blob = EntryHandle> + Send + Sync,
91{
92 engine: RuntimeEngine<P>,
93 default_nulls_first: AtomicBool,
94}
95
96const DROPPED_TABLE_TRANSACTION_ERR: &str = "another transaction has dropped this table";
97
98impl<P> Clone for SqlEngine<P>
99where
100 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
101{
102 fn clone(&self) -> Self {
103 tracing::warn!(
104 "[SQL_ENGINE] SqlEngine::clone() called - will create new Engine with new session!"
105 );
106 Self {
108 engine: self.engine.clone(),
109 default_nulls_first: AtomicBool::new(
110 self.default_nulls_first.load(AtomicOrdering::Relaxed),
111 ),
112 }
113 }
114}
115
116#[allow(dead_code)]
117impl<P> SqlEngine<P>
118where
119 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
120{
121 fn map_table_error(table_name: &str, err: Error) -> Error {
122 match err {
123 Error::NotFound => Self::table_not_found_error(table_name),
124 Error::InvalidArgumentError(msg) if msg.contains("unknown table") => {
125 Self::table_not_found_error(table_name)
126 }
127 other => other,
128 }
129 }
130
131 fn table_not_found_error(table_name: &str) -> Error {
132 Error::CatalogError(format!(
133 "Catalog Error: Table '{table_name}' does not exist"
134 ))
135 }
136
137 fn is_table_missing_error(err: &Error) -> bool {
138 match err {
139 Error::NotFound => true,
140 Error::CatalogError(msg) => {
141 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
142 }
143 Error::InvalidArgumentError(msg) => {
144 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
145 }
146 _ => false,
147 }
148 }
149
150 fn execute_plan_statement(
151 &self,
152 statement: PlanStatement,
153 ) -> SqlResult<RuntimeStatementResult<P>> {
154 let table = llkv_runtime::statement_table_name(&statement).map(str::to_string);
155 self.engine.execute_statement(statement).map_err(|err| {
156 if let Some(table_name) = table {
157 Self::map_table_error(&table_name, err)
158 } else {
159 err
160 }
161 })
162 }
163
164 pub fn new(pager: Arc<P>) -> Self {
165 let engine = RuntimeEngine::new(pager);
166 Self {
167 engine,
168 default_nulls_first: AtomicBool::new(false),
169 }
170 }
171
172 fn preprocess_create_type_syntax(sql: &str) -> String {
180 static CREATE_TYPE_REGEX: OnceLock<Regex> = OnceLock::new();
181 static DROP_TYPE_REGEX: OnceLock<Regex> = OnceLock::new();
182
183 let create_re = CREATE_TYPE_REGEX.get_or_init(|| {
185 Regex::new(r"(?i)\bCREATE\s+TYPE\s+").expect("valid CREATE TYPE regex")
186 });
187
188 let drop_re = DROP_TYPE_REGEX
190 .get_or_init(|| Regex::new(r"(?i)\bDROP\s+TYPE\s+").expect("valid DROP TYPE regex"));
191
192 let sql = create_re.replace_all(sql, "CREATE DOMAIN ").to_string();
194
195 drop_re.replace_all(&sql, "DROP DOMAIN ").to_string()
197 }
198
199 fn preprocess_exclude_syntax(sql: &str) -> String {
200 static EXCLUDE_REGEX: OnceLock<Regex> = OnceLock::new();
201
202 let re = EXCLUDE_REGEX.get_or_init(|| {
205 Regex::new(
206 r"(?i)EXCLUDE\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)+)\s*\)",
207 )
208 .expect("valid EXCLUDE qualifier regex")
209 });
210
211 re.replace_all(sql, |caps: ®ex::Captures| {
212 let qualified_name = &caps[1];
213 format!("EXCLUDE (\"{}\")", qualified_name)
214 })
215 .to_string()
216 }
217
218 fn preprocess_trailing_commas_in_values(sql: &str) -> String {
221 static TRAILING_COMMA_REGEX: OnceLock<Regex> = OnceLock::new();
222
223 let re = TRAILING_COMMA_REGEX
226 .get_or_init(|| Regex::new(r",(\s*)\)").expect("valid trailing comma regex"));
227
228 re.replace_all(sql, "$1)").to_string()
229 }
230
231 pub(crate) fn context_arc(&self) -> Arc<RuntimeContext<P>> {
232 self.engine.context()
233 }
234
235 pub fn with_context(context: Arc<RuntimeContext<P>>, default_nulls_first: bool) -> Self {
236 Self {
237 engine: RuntimeEngine::from_context(context),
238 default_nulls_first: AtomicBool::new(default_nulls_first),
239 }
240 }
241
242 #[cfg(test)]
243 fn default_nulls_first_for_tests(&self) -> bool {
244 self.default_nulls_first.load(AtomicOrdering::Relaxed)
245 }
246
247 fn has_active_transaction(&self) -> bool {
248 self.engine.session().has_active_transaction()
249 }
250
251 pub fn session(&self) -> &RuntimeSession<P> {
253 self.engine.session()
254 }
255
256 pub fn execute(&self, sql: &str) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
269 tracing::trace!("DEBUG SQL execute: {}", sql);
270
271 let processed_sql = Self::preprocess_create_type_syntax(sql);
273
274 let processed_sql = Self::preprocess_exclude_syntax(&processed_sql);
277
278 let processed_sql = Self::preprocess_trailing_commas_in_values(&processed_sql);
281
282 let dialect = GenericDialect {};
283 let statements = Parser::parse_sql(&dialect, &processed_sql)
284 .map_err(|err| Error::InvalidArgumentError(format!("failed to parse SQL: {err}")))?;
285 tracing::trace!("DEBUG SQL execute: parsed {} statements", statements.len());
286
287 let mut results = Vec::with_capacity(statements.len());
288 for (i, statement) in statements.iter().enumerate() {
289 tracing::trace!("DEBUG SQL execute: processing statement {}", i);
290 results.push(self.execute_statement(statement.clone())?);
291 tracing::trace!("DEBUG SQL execute: statement {} completed", i);
292 }
293 tracing::trace!("DEBUG SQL execute completed successfully");
294 Ok(results)
295 }
296
297 pub fn sql(&self, sql: &str) -> SqlResult<Vec<RecordBatch>> {
334 let mut results = self.execute(sql)?;
335 if results.len() != 1 {
336 return Err(Error::InvalidArgumentError(
337 "SqlEngine::sql expects exactly one SQL statement".into(),
338 ));
339 }
340
341 match results.pop().expect("checked length above") {
342 RuntimeStatementResult::Select { execution, .. } => execution.collect(),
343 other => Err(Error::InvalidArgumentError(format!(
344 "SqlEngine::sql requires a SELECT statement, got {other:?}",
345 ))),
346 }
347 }
348
349 fn execute_statement(&self, statement: Statement) -> SqlResult<RuntimeStatementResult<P>> {
350 tracing::trace!(
351 "DEBUG SQL execute_statement: {:?}",
352 match &statement {
353 Statement::Insert(insert) =>
354 format!("Insert(table={:?})", Self::table_name_from_insert(insert)),
355 Statement::Query(_) => "Query".to_string(),
356 Statement::StartTransaction { .. } => "StartTransaction".to_string(),
357 Statement::Commit { .. } => "Commit".to_string(),
358 Statement::Rollback { .. } => "Rollback".to_string(),
359 Statement::CreateTable(_) => "CreateTable".to_string(),
360 Statement::Update { .. } => "Update".to_string(),
361 Statement::Delete(_) => "Delete".to_string(),
362 other => format!("Other({:?})", other),
363 }
364 );
365 match statement {
366 Statement::StartTransaction {
367 modes,
368 begin,
369 transaction,
370 modifier,
371 statements,
372 exception,
373 has_end_keyword,
374 } => self.handle_start_transaction(
375 modes,
376 begin,
377 transaction,
378 modifier,
379 statements,
380 exception,
381 has_end_keyword,
382 ),
383 Statement::Commit {
384 chain,
385 end,
386 modifier,
387 } => self.handle_commit(chain, end, modifier),
388 Statement::Rollback { chain, savepoint } => self.handle_rollback(chain, savepoint),
389 other => self.execute_statement_non_transactional(other),
390 }
391 }
392
393 fn execute_statement_non_transactional(
394 &self,
395 statement: Statement,
396 ) -> SqlResult<RuntimeStatementResult<P>> {
397 tracing::trace!("DEBUG SQL execute_statement_non_transactional called");
398 match statement {
399 Statement::CreateTable(stmt) => {
400 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateTable");
401 self.handle_create_table(stmt)
402 }
403 Statement::CreateIndex(stmt) => {
404 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateIndex");
405 self.handle_create_index(stmt)
406 }
407 Statement::CreateSchema {
408 schema_name,
409 if_not_exists,
410 with,
411 options,
412 default_collate_spec,
413 clone,
414 } => {
415 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateSchema");
416 self.handle_create_schema(
417 schema_name,
418 if_not_exists,
419 with,
420 options,
421 default_collate_spec,
422 clone,
423 )
424 }
425 Statement::CreateView {
426 name,
427 columns,
428 query,
429 materialized,
430 or_replace,
431 or_alter,
432 options,
433 cluster_by,
434 comment,
435 with_no_schema_binding,
436 if_not_exists,
437 temporary,
438 to,
439 params,
440 secure,
441 name_before_not_exists,
442 } => {
443 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateView");
444 self.handle_create_view(
445 name,
446 columns,
447 query,
448 materialized,
449 or_replace,
450 or_alter,
451 options,
452 cluster_by,
453 comment,
454 with_no_schema_binding,
455 if_not_exists,
456 temporary,
457 to,
458 params,
459 secure,
460 name_before_not_exists,
461 )
462 }
463 Statement::CreateDomain(create_domain) => {
464 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateDomain");
465 self.handle_create_domain(create_domain)
466 }
467 Statement::DropDomain(drop_domain) => {
468 tracing::trace!("DEBUG SQL execute_statement_non_transactional: DropDomain");
469 self.handle_drop_domain(drop_domain)
470 }
471 Statement::Insert(stmt) => {
472 let table_name =
473 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
474 tracing::trace!(
475 "DEBUG SQL execute_statement_non_transactional: Insert(table={})",
476 table_name
477 );
478 self.handle_insert(stmt)
479 }
480 Statement::Query(query) => {
481 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Query");
482 self.handle_query(*query)
483 }
484 Statement::Update {
485 table,
486 assignments,
487 from,
488 selection,
489 returning,
490 ..
491 } => {
492 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Update");
493 self.handle_update(table, assignments, from, selection, returning)
494 }
495 Statement::Delete(delete) => {
496 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Delete");
497 self.handle_delete(delete)
498 }
499 Statement::Drop {
500 object_type,
501 if_exists,
502 names,
503 cascade,
504 restrict,
505 purge,
506 temporary,
507 ..
508 } => {
509 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Drop");
510 self.handle_drop(
511 object_type,
512 if_exists,
513 names,
514 cascade,
515 restrict,
516 purge,
517 temporary,
518 )
519 }
520 Statement::AlterTable {
521 name,
522 if_exists,
523 only,
524 operations,
525 ..
526 } => {
527 tracing::trace!("DEBUG SQL execute_statement_non_transactional: AlterTable");
528 self.handle_alter_table(name, if_exists, only, operations)
529 }
530 Statement::Set(set_stmt) => {
531 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Set");
532 self.handle_set(set_stmt)
533 }
534 Statement::Pragma { name, value, is_eq } => {
535 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Pragma");
536 self.handle_pragma(name, value, is_eq)
537 }
538 other => {
539 tracing::trace!(
540 "DEBUG SQL execute_statement_non_transactional: Other({:?})",
541 other
542 );
543 Err(Error::InvalidArgumentError(format!(
544 "unsupported SQL statement: {other:?}"
545 )))
546 }
547 }
548 }
549
550 fn table_name_from_insert(insert: &sqlparser::ast::Insert) -> SqlResult<String> {
551 match &insert.table {
552 TableObject::TableName(name) => Self::object_name_to_string(name),
553 _ => Err(Error::InvalidArgumentError(
554 "INSERT requires a plain table name".into(),
555 )),
556 }
557 }
558
559 fn table_name_from_update(table: &TableWithJoins) -> SqlResult<Option<String>> {
560 if !table.joins.is_empty() {
561 return Err(Error::InvalidArgumentError(
562 "UPDATE with JOIN targets is not supported yet".into(),
563 ));
564 }
565 Self::table_with_joins_name(table)
566 }
567
568 fn table_name_from_delete(delete: &Delete) -> SqlResult<Option<String>> {
569 if !delete.tables.is_empty() {
570 return Err(Error::InvalidArgumentError(
571 "multi-table DELETE is not supported yet".into(),
572 ));
573 }
574 let from_tables = match &delete.from {
575 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
576 };
577 if from_tables.is_empty() {
578 return Ok(None);
579 }
580 if from_tables.len() != 1 {
581 return Err(Error::InvalidArgumentError(
582 "DELETE over multiple tables is not supported yet".into(),
583 ));
584 }
585 Self::table_with_joins_name(&from_tables[0])
586 }
587
588 fn object_name_to_string(name: &ObjectName) -> SqlResult<String> {
589 let (display, _) = canonical_object_name(name)?;
590 Ok(display)
591 }
592
593 #[allow(dead_code)]
594 fn table_object_to_name(table: &TableObject) -> SqlResult<Option<String>> {
595 match table {
596 TableObject::TableName(name) => Ok(Some(Self::object_name_to_string(name)?)),
597 TableObject::TableFunction(_) => Ok(None),
598 }
599 }
600
601 fn table_with_joins_name(table: &TableWithJoins) -> SqlResult<Option<String>> {
602 match &table.relation {
603 TableFactor::Table { name, .. } => Ok(Some(Self::object_name_to_string(name)?)),
604 _ => Ok(None),
605 }
606 }
607
608 fn tables_in_query(query: &Query) -> SqlResult<Vec<String>> {
609 let mut tables = Vec::new();
610 if let sqlparser::ast::SetExpr::Select(select) = query.body.as_ref() {
611 for table in &select.from {
612 if let TableFactor::Table { name, .. } = &table.relation {
613 tables.push(Self::object_name_to_string(name)?);
614 }
615 }
616 }
617 Ok(tables)
618 }
619
620 fn collect_known_columns(
621 &self,
622 display_name: &str,
623 canonical_name: &str,
624 ) -> SqlResult<HashSet<String>> {
625 let context = self.engine.context();
626
627 if context.is_table_marked_dropped(canonical_name) {
628 return Err(Self::table_not_found_error(display_name));
629 }
630
631 if let Some(specs) = self
633 .engine
634 .session()
635 .table_column_specs_from_transaction(canonical_name)
636 {
637 return Ok(specs
638 .into_iter()
639 .map(|spec| spec.name.to_ascii_lowercase())
640 .collect());
641 }
642
643 let (_, canonical_name) = llkv_table::canonical_table_name(display_name)
645 .map_err(|e| arrow::error::ArrowError::ExternalError(Box::new(e)))?;
646 match context.catalog().table_column_specs(&canonical_name) {
647 Ok(specs) => Ok(specs
648 .into_iter()
649 .map(|spec| spec.name.to_ascii_lowercase())
650 .collect()),
651 Err(err) => {
652 if !Self::is_table_missing_error(&err) {
653 return Err(Self::map_table_error(display_name, err));
654 }
655
656 Ok(HashSet::new())
657 }
658 }
659 }
660
661 fn is_table_marked_dropped(&self, table_name: &str) -> SqlResult<bool> {
662 let canonical = table_name.to_ascii_lowercase();
663 Ok(self.engine.context().is_table_marked_dropped(&canonical))
664 }
665
666 fn handle_create_table(
667 &self,
668 mut stmt: sqlparser::ast::CreateTable,
669 ) -> SqlResult<RuntimeStatementResult<P>> {
670 validate_create_table_common(&stmt)?;
671
672 let (mut schema_name, table_name) = parse_schema_qualified_name(&stmt.name)?;
673
674 let namespace = if stmt.temporary {
675 if schema_name.is_some() {
676 return Err(Error::InvalidArgumentError(
677 "temporary tables cannot specify an explicit schema".into(),
678 ));
679 }
680 schema_name = None;
681 Some(TEMPORARY_NAMESPACE_ID.to_string())
682 } else {
683 None
684 };
685
686 if let Some(ref schema) = schema_name {
688 let catalog = self.engine.context().table_catalog();
689 if !catalog.schema_exists(schema) {
690 return Err(Error::CatalogError(format!(
691 "Schema '{}' does not exist",
692 schema
693 )));
694 }
695 }
696
697 let display_name = match &schema_name {
699 Some(schema) => format!("{}.{}", schema, table_name),
700 None => table_name.clone(),
701 };
702 let canonical_name = display_name.to_ascii_lowercase();
703 tracing::trace!(
704 "\n=== HANDLE_CREATE_TABLE: table='{}' columns={} ===",
705 display_name,
706 stmt.columns.len()
707 );
708 if display_name.is_empty() {
709 return Err(Error::InvalidArgumentError(
710 "table name must not be empty".into(),
711 ));
712 }
713
714 if let Some(query) = stmt.query.take() {
715 validate_create_table_as(&stmt)?;
716 if let Some(result) = self.try_handle_range_ctas(
717 &display_name,
718 &canonical_name,
719 &query,
720 stmt.if_not_exists,
721 stmt.or_replace,
722 namespace.clone(),
723 )? {
724 return Ok(result);
725 }
726 return self.handle_create_table_as(
727 display_name,
728 canonical_name,
729 *query,
730 stmt.if_not_exists,
731 stmt.or_replace,
732 namespace.clone(),
733 );
734 }
735
736 if stmt.columns.is_empty() {
737 return Err(Error::InvalidArgumentError(
738 "CREATE TABLE requires at least one column".into(),
739 ));
740 }
741
742 validate_create_table_definition(&stmt)?;
743
744 let column_defs_ast = std::mem::take(&mut stmt.columns);
745 let constraints = std::mem::take(&mut stmt.constraints);
746
747 let column_names: Vec<String> = column_defs_ast
748 .iter()
749 .map(|column_def| column_def.name.value.clone())
750 .collect();
751 ensure_unique_case_insensitive(column_names.iter().map(|name| name.as_str()), |dup| {
752 format!(
753 "duplicate column name '{}' in table '{}'",
754 dup, display_name
755 )
756 })?;
757 let column_names_lower: HashSet<String> = column_names
758 .iter()
759 .map(|name| name.to_ascii_lowercase())
760 .collect();
761
762 let mut columns: Vec<PlanColumnSpec> = Vec::with_capacity(column_defs_ast.len());
763 let mut primary_key_columns: HashSet<String> = HashSet::new();
764 let mut foreign_keys: Vec<ForeignKeySpec> = Vec::new();
765 let mut multi_column_uniques: Vec<MultiColumnUniqueSpec> = Vec::new();
766
767 for column_def in column_defs_ast {
769 let is_nullable = column_def
770 .options
771 .iter()
772 .all(|opt| !matches!(opt.option, ColumnOption::NotNull));
773
774 let is_primary_key = column_def.options.iter().any(|opt| {
775 matches!(
776 opt.option,
777 ColumnOption::Unique {
778 is_primary: true,
779 characteristics: _
780 }
781 )
782 });
783
784 let has_unique_constraint = column_def
785 .options
786 .iter()
787 .any(|opt| matches!(opt.option, ColumnOption::Unique { .. }));
788
789 let check_expr = column_def.options.iter().find_map(|opt| {
791 if let ColumnOption::Check(expr) = &opt.option {
792 Some(expr)
793 } else {
794 None
795 }
796 });
797
798 if let Some(check_expr) = check_expr {
800 let all_col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
801 validate_check_constraint(check_expr, &display_name, &all_col_refs)?;
802 }
803
804 let check_expr_str = check_expr.map(|e| e.to_string());
805
806 for opt in &column_def.options {
808 if let ColumnOption::ForeignKey {
809 foreign_table,
810 referred_columns,
811 on_delete,
812 on_update,
813 characteristics,
814 } = &opt.option
815 {
816 let spec = self.build_foreign_key_spec(
817 &display_name,
818 &canonical_name,
819 vec![column_def.name.value.clone()],
820 foreign_table,
821 referred_columns,
822 *on_delete,
823 *on_update,
824 characteristics,
825 &column_names_lower,
826 None,
827 )?;
828 foreign_keys.push(spec);
829 }
830 }
831
832 tracing::trace!(
833 "DEBUG CREATE TABLE column '{}' is_primary_key={} has_unique={} check_expr={:?}",
834 column_def.name.value,
835 is_primary_key,
836 has_unique_constraint,
837 check_expr_str
838 );
839
840 let resolved_data_type = self.engine.context().resolve_type(&column_def.data_type);
842
843 let mut column = PlanColumnSpec::new(
844 column_def.name.value.clone(),
845 arrow_type_from_sql(&resolved_data_type)?,
846 is_nullable,
847 );
848 tracing::trace!(
849 "DEBUG PlanColumnSpec after new(): primary_key={} unique={}",
850 column.primary_key,
851 column.unique
852 );
853
854 column = column
855 .with_primary_key(is_primary_key)
856 .with_unique(has_unique_constraint)
857 .with_check(check_expr_str);
858
859 if is_primary_key {
860 column.nullable = false;
861 primary_key_columns.insert(column.name.to_ascii_lowercase());
862 }
863 tracing::trace!(
864 "DEBUG PlanColumnSpec after with_primary_key({})/with_unique({}): primary_key={} unique={} check_expr={:?}",
865 is_primary_key,
866 has_unique_constraint,
867 column.primary_key,
868 column.unique,
869 column.check_expr
870 );
871
872 columns.push(column);
873 }
874
875 if !constraints.is_empty() {
877 let mut column_lookup: HashMap<String, usize> = HashMap::with_capacity(columns.len());
878 for (idx, column) in columns.iter().enumerate() {
879 column_lookup.insert(column.name.to_ascii_lowercase(), idx);
880 }
881
882 for constraint in constraints {
883 match constraint {
884 TableConstraint::PrimaryKey {
885 columns: constraint_columns,
886 ..
887 } => {
888 if !primary_key_columns.is_empty() {
889 return Err(Error::InvalidArgumentError(
890 "multiple PRIMARY KEY constraints are not supported".into(),
891 ));
892 }
893
894 ensure_non_empty(&constraint_columns, || {
895 "PRIMARY KEY requires at least one column".into()
896 })?;
897
898 let mut pk_column_names: Vec<String> =
899 Vec::with_capacity(constraint_columns.len());
900
901 for index_col in &constraint_columns {
902 let column_ident = extract_index_column_name(
903 index_col,
904 "PRIMARY KEY",
905 false, false, )?;
908 pk_column_names.push(column_ident);
909 }
910
911 ensure_unique_case_insensitive(
912 pk_column_names.iter().map(|name| name.as_str()),
913 |dup| format!("duplicate column '{}' in PRIMARY KEY constraint", dup),
914 )?;
915
916 ensure_known_columns_case_insensitive(
917 pk_column_names.iter().map(|name| name.as_str()),
918 &column_names_lower,
919 |unknown| {
920 format!("unknown column '{}' in PRIMARY KEY constraint", unknown)
921 },
922 )?;
923
924 for column_ident in pk_column_names {
925 let normalized = column_ident.to_ascii_lowercase();
926 let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
927 Error::InvalidArgumentError(format!(
928 "unknown column '{}' in PRIMARY KEY constraint",
929 column_ident
930 ))
931 })?;
932
933 let column = columns.get_mut(idx).expect("column index valid");
934 column.primary_key = true;
935 column.unique = true;
936 column.nullable = false;
937
938 primary_key_columns.insert(normalized);
939 }
940 }
941 TableConstraint::Unique {
942 columns: constraint_columns,
943 index_type,
944 index_options,
945 characteristics,
946 nulls_distinct,
947 name,
948 ..
949 } => {
950 if !matches!(nulls_distinct, NullsDistinctOption::None) {
951 return Err(Error::InvalidArgumentError(
952 "UNIQUE constraints with NULLS DISTINCT/NOT DISTINCT are not supported yet".into(),
953 ));
954 }
955
956 if index_type.is_some() {
957 return Err(Error::InvalidArgumentError(
958 "UNIQUE constraints with index types are not supported yet".into(),
959 ));
960 }
961
962 if !index_options.is_empty() {
963 return Err(Error::InvalidArgumentError(
964 "UNIQUE constraints with index options are not supported yet"
965 .into(),
966 ));
967 }
968
969 if characteristics.is_some() {
970 return Err(Error::InvalidArgumentError(
971 "UNIQUE constraint characteristics are not supported yet".into(),
972 ));
973 }
974
975 ensure_non_empty(&constraint_columns, || {
976 "UNIQUE constraint requires at least one column".into()
977 })?;
978
979 let mut unique_column_names: Vec<String> =
980 Vec::with_capacity(constraint_columns.len());
981
982 for index_column in &constraint_columns {
983 let column_ident = extract_index_column_name(
984 index_column,
985 "UNIQUE constraint",
986 false, false, )?;
989 unique_column_names.push(column_ident);
990 }
991
992 ensure_unique_case_insensitive(
993 unique_column_names.iter().map(|name| name.as_str()),
994 |dup| format!("duplicate column '{}' in UNIQUE constraint", dup),
995 )?;
996
997 ensure_known_columns_case_insensitive(
998 unique_column_names.iter().map(|name| name.as_str()),
999 &column_names_lower,
1000 |unknown| format!("unknown column '{}' in UNIQUE constraint", unknown),
1001 )?;
1002
1003 if unique_column_names.len() > 1 {
1004 multi_column_uniques.push(MultiColumnUniqueSpec {
1006 name: name.map(|n| n.value),
1007 columns: unique_column_names,
1008 });
1009 } else {
1010 let column_ident = unique_column_names
1012 .into_iter()
1013 .next()
1014 .expect("unique constraint checked for emptiness");
1015 let normalized = column_ident.to_ascii_lowercase();
1016 let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
1017 Error::InvalidArgumentError(format!(
1018 "unknown column '{}' in UNIQUE constraint",
1019 column_ident
1020 ))
1021 })?;
1022
1023 let column = columns
1024 .get_mut(idx)
1025 .expect("column index from lookup must be valid");
1026 column.unique = true;
1027 }
1028 }
1029 TableConstraint::ForeignKey {
1030 name,
1031 index_name,
1032 columns: fk_columns,
1033 foreign_table,
1034 referred_columns,
1035 on_delete,
1036 on_update,
1037 characteristics,
1038 ..
1039 } => {
1040 if index_name.is_some() {
1041 return Err(Error::InvalidArgumentError(
1042 "FOREIGN KEY index clauses are not supported yet".into(),
1043 ));
1044 }
1045
1046 let referencing_columns: Vec<String> =
1047 fk_columns.into_iter().map(|ident| ident.value).collect();
1048 let spec = self.build_foreign_key_spec(
1049 &display_name,
1050 &canonical_name,
1051 referencing_columns,
1052 &foreign_table,
1053 &referred_columns,
1054 on_delete,
1055 on_update,
1056 &characteristics,
1057 &column_names_lower,
1058 name.map(|ident| ident.value),
1059 )?;
1060
1061 foreign_keys.push(spec);
1062 }
1063 unsupported => {
1064 return Err(Error::InvalidArgumentError(format!(
1065 "table-level constraint {:?} is not supported",
1066 unsupported
1067 )));
1068 }
1069 }
1070 }
1071 }
1072
1073 let plan = CreateTablePlan {
1074 name: display_name,
1075 if_not_exists: stmt.if_not_exists,
1076 or_replace: stmt.or_replace,
1077 columns,
1078 source: None,
1079 namespace,
1080 foreign_keys,
1081 multi_column_uniques,
1082 };
1083 self.execute_plan_statement(PlanStatement::CreateTable(plan))
1084 }
1085
1086 fn handle_create_index(
1087 &self,
1088 stmt: sqlparser::ast::CreateIndex,
1089 ) -> SqlResult<RuntimeStatementResult<P>> {
1090 let sqlparser::ast::CreateIndex {
1091 name,
1092 table_name,
1093 using,
1094 columns,
1095 unique,
1096 concurrently,
1097 if_not_exists,
1098 include,
1099 nulls_distinct,
1100 with,
1101 predicate,
1102 index_options,
1103 alter_options,
1104 ..
1105 } = stmt;
1106
1107 if concurrently {
1108 return Err(Error::InvalidArgumentError(
1109 "CREATE INDEX CONCURRENTLY is not supported".into(),
1110 ));
1111 }
1112 if using.is_some() {
1113 return Err(Error::InvalidArgumentError(
1114 "CREATE INDEX USING clauses are not supported".into(),
1115 ));
1116 }
1117 if !include.is_empty() {
1118 return Err(Error::InvalidArgumentError(
1119 "CREATE INDEX INCLUDE columns are not supported".into(),
1120 ));
1121 }
1122 if nulls_distinct.is_some() {
1123 return Err(Error::InvalidArgumentError(
1124 "CREATE INDEX NULLS DISTINCT is not supported".into(),
1125 ));
1126 }
1127 if !with.is_empty() {
1128 return Err(Error::InvalidArgumentError(
1129 "CREATE INDEX WITH options are not supported".into(),
1130 ));
1131 }
1132 if predicate.is_some() {
1133 return Err(Error::InvalidArgumentError(
1134 "partial CREATE INDEX is not supported".into(),
1135 ));
1136 }
1137 if !index_options.is_empty() {
1138 return Err(Error::InvalidArgumentError(
1139 "CREATE INDEX options are not supported".into(),
1140 ));
1141 }
1142 if !alter_options.is_empty() {
1143 return Err(Error::InvalidArgumentError(
1144 "CREATE INDEX ALTER options are not supported".into(),
1145 ));
1146 }
1147 if columns.is_empty() {
1148 return Err(Error::InvalidArgumentError(
1149 "CREATE INDEX requires at least one column".into(),
1150 ));
1151 }
1152
1153 let (schema_name, base_table_name) = parse_schema_qualified_name(&table_name)?;
1154 if let Some(ref schema) = schema_name {
1155 let catalog = self.engine.context().table_catalog();
1156 if !catalog.schema_exists(schema) {
1157 return Err(Error::CatalogError(format!(
1158 "Schema '{}' does not exist",
1159 schema
1160 )));
1161 }
1162 }
1163
1164 let display_table_name = schema_name
1165 .as_ref()
1166 .map(|schema| format!("{}.{}", schema, base_table_name))
1167 .unwrap_or_else(|| base_table_name.clone());
1168 let canonical_table_name = display_table_name.to_ascii_lowercase();
1169
1170 let known_columns =
1171 self.collect_known_columns(&display_table_name, &canonical_table_name)?;
1172 let enforce_known_columns = !known_columns.is_empty();
1173
1174 let index_name = match name {
1175 Some(name_obj) => Some(Self::object_name_to_string(&name_obj)?),
1176 None => None,
1177 };
1178
1179 let mut index_columns: Vec<IndexColumnPlan> = Vec::with_capacity(columns.len());
1180 let mut seen_column_names: HashSet<String> = HashSet::new();
1181 for item in columns {
1182 if item.column.with_fill.is_some() {
1184 return Err(Error::InvalidArgumentError(
1185 "CREATE INDEX column WITH FILL is not supported".into(),
1186 ));
1187 }
1188
1189 let column_name = extract_index_column_name(
1190 &item,
1191 "CREATE INDEX",
1192 true, true, )?;
1195
1196 let order_expr = &item.column;
1198 let ascending = order_expr.options.asc.unwrap_or(true);
1199 let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
1200
1201 let normalized = column_name.to_ascii_lowercase();
1202 if !seen_column_names.insert(normalized.clone()) {
1203 return Err(Error::InvalidArgumentError(format!(
1204 "duplicate column '{}' in CREATE INDEX",
1205 column_name
1206 )));
1207 }
1208
1209 if enforce_known_columns && !known_columns.contains(&normalized) {
1210 return Err(Error::InvalidArgumentError(format!(
1211 "column '{}' does not exist in table '{}'",
1212 column_name, display_table_name
1213 )));
1214 }
1215
1216 let column_plan = IndexColumnPlan::new(column_name).with_sort(ascending, nulls_first);
1217 index_columns.push(column_plan);
1218 }
1219
1220 if index_columns.len() > 1 && !unique {
1221 return Err(Error::InvalidArgumentError(
1222 "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
1223 ));
1224 }
1225
1226 let plan = CreateIndexPlan::new(display_table_name)
1227 .with_name(index_name)
1228 .with_unique(unique)
1229 .with_if_not_exists(if_not_exists)
1230 .with_columns(index_columns);
1231
1232 self.execute_plan_statement(PlanStatement::CreateIndex(plan))
1233 }
1234
1235 fn map_referential_action(
1236 action: Option<ReferentialAction>,
1237 kind: &str,
1238 ) -> SqlResult<ForeignKeyAction> {
1239 match action {
1240 None | Some(ReferentialAction::NoAction) => Ok(ForeignKeyAction::NoAction),
1241 Some(ReferentialAction::Restrict) => Ok(ForeignKeyAction::Restrict),
1242 Some(other) => Err(Error::InvalidArgumentError(format!(
1243 "FOREIGN KEY ON {kind} {:?} is not supported yet",
1244 other
1245 ))),
1246 }
1247 }
1248
1249 #[allow(clippy::too_many_arguments)]
1250 fn build_foreign_key_spec(
1251 &self,
1252 _referencing_display: &str,
1253 referencing_canonical: &str,
1254 referencing_columns: Vec<String>,
1255 foreign_table: &ObjectName,
1256 referenced_columns: &[Ident],
1257 on_delete: Option<ReferentialAction>,
1258 on_update: Option<ReferentialAction>,
1259 characteristics: &Option<ConstraintCharacteristics>,
1260 known_columns_lower: &HashSet<String>,
1261 name: Option<String>,
1262 ) -> SqlResult<ForeignKeySpec> {
1263 if characteristics.is_some() {
1264 return Err(Error::InvalidArgumentError(
1265 "FOREIGN KEY constraint characteristics are not supported yet".into(),
1266 ));
1267 }
1268
1269 ensure_non_empty(&referencing_columns, || {
1270 "FOREIGN KEY constraint requires at least one referencing column".into()
1271 })?;
1272 ensure_unique_case_insensitive(
1273 referencing_columns.iter().map(|name| name.as_str()),
1274 |dup| format!("duplicate column '{}' in FOREIGN KEY constraint", dup),
1275 )?;
1276 ensure_known_columns_case_insensitive(
1277 referencing_columns.iter().map(|name| name.as_str()),
1278 known_columns_lower,
1279 |unknown| format!("unknown column '{}' in FOREIGN KEY constraint", unknown),
1280 )?;
1281
1282 let referenced_columns_vec: Vec<String> = referenced_columns
1283 .iter()
1284 .map(|ident| ident.value.clone())
1285 .collect();
1286 ensure_unique_case_insensitive(
1287 referenced_columns_vec.iter().map(|name| name.as_str()),
1288 |dup| {
1289 format!(
1290 "duplicate referenced column '{}' in FOREIGN KEY constraint",
1291 dup
1292 )
1293 },
1294 )?;
1295
1296 if !referenced_columns_vec.is_empty()
1297 && referenced_columns_vec.len() != referencing_columns.len()
1298 {
1299 return Err(Error::InvalidArgumentError(
1300 "FOREIGN KEY referencing and referenced column counts must match".into(),
1301 ));
1302 }
1303
1304 let (referenced_display, referenced_canonical) = canonical_object_name(foreign_table)?;
1305
1306 let catalog = self.engine.context().table_catalog();
1308 if let Some(table_id) = catalog.table_id(&referenced_canonical) {
1309 let context = self.engine.context();
1310 if context.is_view(table_id)? {
1311 return Err(Error::CatalogError(format!(
1312 "Binder Error: cannot reference a VIEW with a FOREIGN KEY: {}",
1313 referenced_display
1314 )));
1315 }
1316 }
1317
1318 if referenced_canonical == referencing_canonical {
1319 ensure_known_columns_case_insensitive(
1320 referenced_columns_vec.iter().map(|name| name.as_str()),
1321 known_columns_lower,
1322 |unknown| {
1323 format!(
1324 "Binder Error: table '{}' does not have a column named '{}'",
1325 referenced_display, unknown
1326 )
1327 },
1328 )?;
1329 } else {
1330 let known_columns =
1331 self.collect_known_columns(&referenced_display, &referenced_canonical)?;
1332 if !known_columns.is_empty() {
1333 ensure_known_columns_case_insensitive(
1334 referenced_columns_vec.iter().map(|name| name.as_str()),
1335 &known_columns,
1336 |unknown| {
1337 format!(
1338 "Binder Error: table '{}' does not have a column named '{}'",
1339 referenced_display, unknown
1340 )
1341 },
1342 )?;
1343 }
1344 }
1345
1346 let on_delete_action = Self::map_referential_action(on_delete, "DELETE")?;
1347 let on_update_action = Self::map_referential_action(on_update, "UPDATE")?;
1348
1349 Ok(ForeignKeySpec {
1350 name,
1351 columns: referencing_columns,
1352 referenced_table: referenced_display,
1353 referenced_columns: referenced_columns_vec,
1354 on_delete: on_delete_action,
1355 on_update: on_update_action,
1356 })
1357 }
1358
1359 fn handle_create_schema(
1360 &self,
1361 schema_name: SchemaName,
1362 _if_not_exists: bool,
1363 with: Option<Vec<SqlOption>>,
1364 options: Option<Vec<SqlOption>>,
1365 default_collate_spec: Option<SqlExpr>,
1366 clone: Option<ObjectName>,
1367 ) -> SqlResult<RuntimeStatementResult<P>> {
1368 if clone.is_some() {
1369 return Err(Error::InvalidArgumentError(
1370 "CREATE SCHEMA ... CLONE is not supported".into(),
1371 ));
1372 }
1373 if with.as_ref().is_some_and(|opts| !opts.is_empty()) {
1374 return Err(Error::InvalidArgumentError(
1375 "CREATE SCHEMA ... WITH options are not supported".into(),
1376 ));
1377 }
1378 if options.as_ref().is_some_and(|opts| !opts.is_empty()) {
1379 return Err(Error::InvalidArgumentError(
1380 "CREATE SCHEMA options are not supported".into(),
1381 ));
1382 }
1383 if default_collate_spec.is_some() {
1384 return Err(Error::InvalidArgumentError(
1385 "CREATE SCHEMA DEFAULT COLLATE is not supported".into(),
1386 ));
1387 }
1388
1389 let schema_name = match schema_name {
1390 SchemaName::Simple(name) => name,
1391 _ => {
1392 return Err(Error::InvalidArgumentError(
1393 "CREATE SCHEMA authorization is not supported".into(),
1394 ));
1395 }
1396 };
1397
1398 let (display_name, canonical) = canonical_object_name(&schema_name)?;
1399 if display_name.is_empty() {
1400 return Err(Error::InvalidArgumentError(
1401 "schema name must not be empty".into(),
1402 ));
1403 }
1404
1405 let catalog = self.engine.context().table_catalog();
1407
1408 if _if_not_exists && catalog.schema_exists(&canonical) {
1409 return Ok(RuntimeStatementResult::NoOp);
1410 }
1411
1412 catalog.register_schema(&canonical).map_err(|err| {
1413 Error::CatalogError(format!(
1414 "Failed to create schema '{}': {}",
1415 display_name, err
1416 ))
1417 })?;
1418
1419 Ok(RuntimeStatementResult::NoOp)
1420 }
1421
1422 #[allow(clippy::too_many_arguments)]
1423 fn handle_create_view(
1424 &self,
1425 name: ObjectName,
1426 _columns: Vec<sqlparser::ast::ViewColumnDef>,
1427 query: Box<sqlparser::ast::Query>,
1428 materialized: bool,
1429 or_replace: bool,
1430 or_alter: bool,
1431 _options: sqlparser::ast::CreateTableOptions,
1432 _cluster_by: Vec<sqlparser::ast::Ident>,
1433 _comment: Option<String>,
1434 _with_no_schema_binding: bool,
1435 if_not_exists: bool,
1436 temporary: bool,
1437 _to: Option<ObjectName>,
1438 _params: Option<sqlparser::ast::CreateViewParams>,
1439 _secure: bool,
1440 _name_before_not_exists: bool,
1441 ) -> SqlResult<RuntimeStatementResult<P>> {
1442 if materialized {
1444 return Err(Error::InvalidArgumentError(
1445 "MATERIALIZED VIEWS are not supported".into(),
1446 ));
1447 }
1448 if or_replace {
1449 return Err(Error::InvalidArgumentError(
1450 "CREATE OR REPLACE VIEW is not supported".into(),
1451 ));
1452 }
1453 if or_alter {
1454 return Err(Error::InvalidArgumentError(
1455 "CREATE OR ALTER VIEW is not supported".into(),
1456 ));
1457 }
1458 if temporary {
1459 return Err(Error::InvalidArgumentError(
1460 "TEMPORARY VIEWS are not supported".into(),
1461 ));
1462 }
1463
1464 let (schema_name, view_name) = parse_schema_qualified_name(&name)?;
1466
1467 if let Some(ref schema) = schema_name {
1469 let catalog = self.engine.context().table_catalog();
1470 if !catalog.schema_exists(schema) {
1471 return Err(Error::CatalogError(format!(
1472 "Schema '{}' does not exist",
1473 schema
1474 )));
1475 }
1476 }
1477
1478 let display_name = match &schema_name {
1480 Some(schema) => format!("{}.{}", schema, view_name),
1481 None => view_name.clone(),
1482 };
1483 let canonical_name = display_name.to_ascii_lowercase();
1484
1485 let catalog = self.engine.context().table_catalog();
1487 if catalog.table_exists(&canonical_name) {
1488 if if_not_exists {
1489 return Ok(RuntimeStatementResult::NoOp);
1490 }
1491 return Err(Error::CatalogError(format!(
1492 "Table or view '{}' already exists",
1493 display_name
1494 )));
1495 }
1496
1497 let view_definition = query.to_string();
1499
1500 let context = self.engine.context();
1502 context.create_view(&display_name, view_definition)?;
1503
1504 tracing::debug!("Created view: {}", display_name);
1505 Ok(RuntimeStatementResult::NoOp)
1506 }
1507
1508 fn handle_create_domain(
1509 &self,
1510 create_domain: sqlparser::ast::CreateDomain,
1511 ) -> SqlResult<RuntimeStatementResult<P>> {
1512 use llkv_table::CustomTypeMeta;
1513 use std::time::{SystemTime, UNIX_EPOCH};
1514
1515 let type_name = create_domain.name.to_string();
1517
1518 let base_type_sql = create_domain.data_type.to_string();
1520
1521 self.engine
1523 .context()
1524 .register_type(type_name.clone(), create_domain.data_type.clone());
1525
1526 let context = self.engine.context();
1528 let catalog = llkv_table::SysCatalog::new(context.store());
1529
1530 let created_at_micros = SystemTime::now()
1531 .duration_since(UNIX_EPOCH)
1532 .unwrap_or_default()
1533 .as_micros() as u64;
1534
1535 let meta = CustomTypeMeta {
1536 name: type_name.clone(),
1537 base_type_sql,
1538 created_at_micros,
1539 };
1540
1541 catalog.put_custom_type_meta(&meta)?;
1542
1543 tracing::debug!("Created and persisted type alias: {}", type_name);
1544 Ok(RuntimeStatementResult::NoOp)
1545 }
1546
1547 fn handle_drop_domain(
1548 &self,
1549 drop_domain: sqlparser::ast::DropDomain,
1550 ) -> SqlResult<RuntimeStatementResult<P>> {
1551 let if_exists = drop_domain.if_exists;
1552 let type_name = drop_domain.name.to_string();
1553
1554 let result = self.engine.context().drop_type(&type_name);
1556
1557 if let Err(err) = result {
1558 if !if_exists {
1559 return Err(err);
1560 }
1561 } else {
1563 let context = self.engine.context();
1565 let catalog = llkv_table::SysCatalog::new(context.store());
1566 catalog.delete_custom_type_meta(&type_name)?;
1567
1568 tracing::debug!("Dropped and removed from catalog type alias: {}", type_name);
1569 }
1570
1571 Ok(RuntimeStatementResult::NoOp)
1572 }
1573
1574 fn try_handle_range_ctas(
1575 &self,
1576 display_name: &str,
1577 _canonical_name: &str,
1578 query: &Query,
1579 if_not_exists: bool,
1580 or_replace: bool,
1581 namespace: Option<String>,
1582 ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1583 let select = match query.body.as_ref() {
1584 SetExpr::Select(select) => select,
1585 _ => return Ok(None),
1586 };
1587 if select.from.len() != 1 {
1588 return Ok(None);
1589 }
1590 let table_with_joins = &select.from[0];
1591 if !table_with_joins.joins.is_empty() {
1592 return Ok(None);
1593 }
1594 let (range_size, range_alias) = match &table_with_joins.relation {
1595 TableFactor::Table {
1596 name,
1597 args: Some(args),
1598 alias,
1599 ..
1600 } => {
1601 let func_name = name.to_string().to_ascii_lowercase();
1602 if func_name != "range" {
1603 return Ok(None);
1604 }
1605 if args.args.len() != 1 {
1606 return Err(Error::InvalidArgumentError(
1607 "range table function expects a single argument".into(),
1608 ));
1609 }
1610 let size_expr = &args.args[0];
1611 let range_size = match size_expr {
1612 FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1613 match &value.value {
1614 Value::Number(raw, _) => raw.parse::<i64>().map_err(|e| {
1615 Error::InvalidArgumentError(format!(
1616 "invalid range size literal {}: {}",
1617 raw, e
1618 ))
1619 })?,
1620 other => {
1621 return Err(Error::InvalidArgumentError(format!(
1622 "unsupported range size value: {:?}",
1623 other
1624 )));
1625 }
1626 }
1627 }
1628 _ => {
1629 return Err(Error::InvalidArgumentError(
1630 "unsupported range argument".into(),
1631 ));
1632 }
1633 };
1634 (range_size, alias.as_ref().map(|a| a.name.value.clone()))
1635 }
1636 _ => return Ok(None),
1637 };
1638
1639 if range_size < 0 {
1640 return Err(Error::InvalidArgumentError(
1641 "range size must be non-negative".into(),
1642 ));
1643 }
1644
1645 if select.projection.is_empty() {
1646 return Err(Error::InvalidArgumentError(
1647 "CREATE TABLE AS SELECT requires at least one projected column".into(),
1648 ));
1649 }
1650
1651 let mut column_specs = Vec::with_capacity(select.projection.len());
1652 let mut column_names = Vec::with_capacity(select.projection.len());
1653 let mut row_template = Vec::with_capacity(select.projection.len());
1654 for item in &select.projection {
1655 match item {
1656 SelectItem::ExprWithAlias { expr, alias } => {
1657 let (value, data_type) = match expr {
1658 SqlExpr::Value(value_with_span) => match &value_with_span.value {
1659 Value::Number(raw, _) => {
1660 let parsed = raw.parse::<i64>().map_err(|e| {
1661 Error::InvalidArgumentError(format!(
1662 "invalid numeric literal {}: {}",
1663 raw, e
1664 ))
1665 })?;
1666 (
1667 PlanValue::Integer(parsed),
1668 arrow::datatypes::DataType::Int64,
1669 )
1670 }
1671 Value::SingleQuotedString(s) => (
1672 PlanValue::String(s.clone()),
1673 arrow::datatypes::DataType::Utf8,
1674 ),
1675 other => {
1676 return Err(Error::InvalidArgumentError(format!(
1677 "unsupported SELECT expression in range CTAS: {:?}",
1678 other
1679 )));
1680 }
1681 },
1682 SqlExpr::Identifier(ident) => {
1683 let ident_lower = ident.value.to_ascii_lowercase();
1684 if range_alias
1685 .as_ref()
1686 .map(|a| a.eq_ignore_ascii_case(&ident_lower))
1687 .unwrap_or(false)
1688 || ident_lower == "range"
1689 {
1690 return Err(Error::InvalidArgumentError(
1691 "range() table function columns are not supported yet".into(),
1692 ));
1693 }
1694 return Err(Error::InvalidArgumentError(format!(
1695 "unsupported identifier '{}' in range CTAS projection",
1696 ident.value
1697 )));
1698 }
1699 other => {
1700 return Err(Error::InvalidArgumentError(format!(
1701 "unsupported SELECT expression in range CTAS: {:?}",
1702 other
1703 )));
1704 }
1705 };
1706 let column_name = alias.value.clone();
1707 column_specs.push(PlanColumnSpec::new(column_name.clone(), data_type, true));
1708 column_names.push(column_name);
1709 row_template.push(value);
1710 }
1711 other => {
1712 return Err(Error::InvalidArgumentError(format!(
1713 "unsupported projection {:?} in range CTAS",
1714 other
1715 )));
1716 }
1717 }
1718 }
1719
1720 let plan = CreateTablePlan {
1721 name: display_name.to_string(),
1722 if_not_exists,
1723 or_replace,
1724 columns: column_specs,
1725 source: None,
1726 namespace,
1727 foreign_keys: Vec::new(),
1728 multi_column_uniques: Vec::new(),
1729 };
1730 let create_result = self.execute_plan_statement(PlanStatement::CreateTable(plan))?;
1731
1732 let row_count = range_size
1733 .try_into()
1734 .map_err(|_| Error::InvalidArgumentError("range size exceeds usize".into()))?;
1735 if row_count > 0 {
1736 let rows = vec![row_template; row_count];
1737 let insert_plan = InsertPlan {
1738 table: display_name.to_string(),
1739 columns: column_names,
1740 source: InsertSource::Rows(rows),
1741 };
1742 self.execute_plan_statement(PlanStatement::Insert(insert_plan))?;
1743 }
1744
1745 Ok(Some(create_result))
1746 }
1747
1748 fn try_handle_pragma_table_info(
1752 &self,
1753 query: &Query,
1754 ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1755 let select = match query.body.as_ref() {
1756 SetExpr::Select(select) => select,
1757 _ => return Ok(None),
1758 };
1759
1760 if select.from.len() != 1 {
1761 return Ok(None);
1762 }
1763
1764 let table_with_joins = &select.from[0];
1765 if !table_with_joins.joins.is_empty() {
1766 return Ok(None);
1767 }
1768
1769 let table_name = match &table_with_joins.relation {
1771 TableFactor::Table {
1772 name,
1773 args: Some(args),
1774 ..
1775 } => {
1776 let func_name = name.to_string().to_ascii_lowercase();
1777 if func_name != "pragma_table_info" {
1778 return Ok(None);
1779 }
1780
1781 if args.args.len() != 1 {
1783 return Err(Error::InvalidArgumentError(
1784 "pragma_table_info expects exactly one argument".into(),
1785 ));
1786 }
1787
1788 match &args.args[0] {
1789 FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1790 match &value.value {
1791 Value::SingleQuotedString(s) => s.clone(),
1792 Value::DoubleQuotedString(s) => s.clone(),
1793 _ => {
1794 return Err(Error::InvalidArgumentError(
1795 "pragma_table_info argument must be a string".into(),
1796 ));
1797 }
1798 }
1799 }
1800 _ => {
1801 return Err(Error::InvalidArgumentError(
1802 "pragma_table_info argument must be a string literal".into(),
1803 ));
1804 }
1805 }
1806 }
1807 _ => return Ok(None),
1808 };
1809
1810 let context = self.engine.context();
1812 let (_, canonical_name) = llkv_table::canonical_table_name(&table_name)?;
1813 let columns = context.catalog().table_column_specs(&canonical_name)?;
1814
1815 use arrow::array::{BooleanArray, Int32Array, StringArray};
1817 use arrow::datatypes::{DataType, Field, Schema};
1818
1819 let mut cid_values = Vec::new();
1820 let mut name_values = Vec::new();
1821 let mut type_values = Vec::new();
1822 let mut notnull_values = Vec::new();
1823 let mut dflt_value_values: Vec<Option<String>> = Vec::new();
1824 let mut pk_values = Vec::new();
1825
1826 for (idx, col) in columns.iter().enumerate() {
1827 cid_values.push(idx as i32);
1828 name_values.push(col.name.clone());
1829 type_values.push(format!("{:?}", col.data_type)); notnull_values.push(!col.nullable);
1831 dflt_value_values.push(None); pk_values.push(col.primary_key);
1833 }
1834
1835 let schema = Arc::new(Schema::new(vec![
1836 Field::new("cid", DataType::Int32, false),
1837 Field::new("name", DataType::Utf8, false),
1838 Field::new("type", DataType::Utf8, false),
1839 Field::new("notnull", DataType::Boolean, false),
1840 Field::new("dflt_value", DataType::Utf8, true),
1841 Field::new("pk", DataType::Boolean, false),
1842 ]));
1843
1844 use arrow::array::ArrayRef;
1845 let mut batch = RecordBatch::try_new(
1846 Arc::clone(&schema),
1847 vec![
1848 Arc::new(Int32Array::from(cid_values)) as ArrayRef,
1849 Arc::new(StringArray::from(name_values)) as ArrayRef,
1850 Arc::new(StringArray::from(type_values)) as ArrayRef,
1851 Arc::new(BooleanArray::from(notnull_values)) as ArrayRef,
1852 Arc::new(StringArray::from(dflt_value_values)) as ArrayRef,
1853 Arc::new(BooleanArray::from(pk_values)) as ArrayRef,
1854 ],
1855 )
1856 .map_err(|e| Error::Internal(format!("failed to create pragma_table_info batch: {}", e)))?;
1857
1858 let projection_indices: Vec<usize> = select
1860 .projection
1861 .iter()
1862 .filter_map(|item| {
1863 match item {
1864 SelectItem::UnnamedExpr(SqlExpr::Identifier(ident)) => {
1865 schema.index_of(&ident.value).ok()
1866 }
1867 SelectItem::ExprWithAlias { expr, .. } => {
1868 if let SqlExpr::Identifier(ident) = expr {
1869 schema.index_of(&ident.value).ok()
1870 } else {
1871 None
1872 }
1873 }
1874 SelectItem::Wildcard(_) => None, _ => None,
1876 }
1877 })
1878 .collect();
1879
1880 let projected_schema;
1882 if !projection_indices.is_empty() {
1883 let projected_fields: Vec<Field> = projection_indices
1884 .iter()
1885 .map(|&idx| schema.field(idx).clone())
1886 .collect();
1887 projected_schema = Arc::new(Schema::new(projected_fields));
1888
1889 let projected_columns: Vec<ArrayRef> = projection_indices
1890 .iter()
1891 .map(|&idx| Arc::clone(batch.column(idx)))
1892 .collect();
1893
1894 batch = RecordBatch::try_new(Arc::clone(&projected_schema), projected_columns)
1895 .map_err(|e| Error::Internal(format!("failed to project columns: {}", e)))?;
1896 } else {
1897 projected_schema = schema;
1899 }
1900
1901 if let Some(order_by) = &query.order_by {
1903 use arrow::compute::SortColumn;
1904 use arrow::compute::lexsort_to_indices;
1905 use sqlparser::ast::OrderByKind;
1906
1907 let exprs = match &order_by.kind {
1908 OrderByKind::Expressions(exprs) => exprs,
1909 _ => {
1910 return Err(Error::InvalidArgumentError(
1911 "unsupported ORDER BY clause".into(),
1912 ));
1913 }
1914 };
1915
1916 let mut sort_columns = Vec::new();
1917 for order_expr in exprs {
1918 if let SqlExpr::Identifier(ident) = &order_expr.expr
1919 && let Ok(col_idx) = projected_schema.index_of(&ident.value)
1920 {
1921 let options = arrow::compute::SortOptions {
1922 descending: !order_expr.options.asc.unwrap_or(true),
1923 nulls_first: order_expr.options.nulls_first.unwrap_or(false),
1924 };
1925 sort_columns.push(SortColumn {
1926 values: Arc::clone(batch.column(col_idx)),
1927 options: Some(options),
1928 });
1929 }
1930 }
1931
1932 if !sort_columns.is_empty() {
1933 let indices = lexsort_to_indices(&sort_columns, None)
1934 .map_err(|e| Error::Internal(format!("failed to sort: {}", e)))?;
1935
1936 use arrow::compute::take;
1937 let sorted_columns: Result<Vec<ArrayRef>, _> = batch
1938 .columns()
1939 .iter()
1940 .map(|col| take(col.as_ref(), &indices, None))
1941 .collect();
1942
1943 batch = RecordBatch::try_new(
1944 Arc::clone(&projected_schema),
1945 sorted_columns
1946 .map_err(|e| Error::Internal(format!("failed to apply sort: {}", e)))?,
1947 )
1948 .map_err(|e| Error::Internal(format!("failed to create sorted batch: {}", e)))?;
1949 }
1950 }
1951
1952 let execution = SelectExecution::new_single_batch(
1953 table_name.clone(),
1954 Arc::clone(&projected_schema),
1955 batch,
1956 );
1957
1958 Ok(Some(RuntimeStatementResult::Select {
1959 table_name,
1960 schema: projected_schema,
1961 execution: Box::new(execution),
1962 }))
1963 }
1964
1965 fn handle_create_table_as(
1966 &self,
1967 display_name: String,
1968 _canonical_name: String,
1969 query: Query,
1970 if_not_exists: bool,
1971 or_replace: bool,
1972 namespace: Option<String>,
1973 ) -> SqlResult<RuntimeStatementResult<P>> {
1974 if let SetExpr::Select(select) = query.body.as_ref()
1977 && let Some((rows, column_names)) = extract_values_from_derived_table(&select.from)?
1978 {
1979 return self.handle_create_table_from_values(
1981 display_name,
1982 rows,
1983 column_names,
1984 if_not_exists,
1985 or_replace,
1986 namespace,
1987 );
1988 }
1989
1990 let select_plan = self.build_select_plan(query)?;
1992
1993 if select_plan.projections.is_empty() && select_plan.aggregates.is_empty() {
1994 return Err(Error::InvalidArgumentError(
1995 "CREATE TABLE AS SELECT requires at least one projected column".into(),
1996 ));
1997 }
1998
1999 let plan = CreateTablePlan {
2000 name: display_name,
2001 if_not_exists,
2002 or_replace,
2003 columns: Vec::new(),
2004 source: Some(CreateTableSource::Select {
2005 plan: Box::new(select_plan),
2006 }),
2007 namespace,
2008 foreign_keys: Vec::new(),
2009 multi_column_uniques: Vec::new(),
2010 };
2011 self.execute_plan_statement(PlanStatement::CreateTable(plan))
2012 }
2013
2014 fn handle_create_table_from_values(
2015 &self,
2016 display_name: String,
2017 rows: Vec<Vec<PlanValue>>,
2018 column_names: Vec<String>,
2019 if_not_exists: bool,
2020 or_replace: bool,
2021 namespace: Option<String>,
2022 ) -> SqlResult<RuntimeStatementResult<P>> {
2023 use arrow::array::{ArrayRef, Float64Builder, Int64Builder, StringBuilder};
2024 use arrow::datatypes::{DataType, Field, Schema};
2025 use arrow::record_batch::RecordBatch;
2026 use std::sync::Arc;
2027
2028 if rows.is_empty() {
2029 return Err(Error::InvalidArgumentError(
2030 "VALUES must have at least one row".into(),
2031 ));
2032 }
2033
2034 let num_cols = column_names.len();
2035
2036 let first_row = &rows[0];
2038 if first_row.len() != num_cols {
2039 return Err(Error::InvalidArgumentError(
2040 "VALUES row column count mismatch".into(),
2041 ));
2042 }
2043
2044 let mut fields = Vec::with_capacity(num_cols);
2045 let mut column_types = Vec::with_capacity(num_cols);
2046
2047 for (idx, value) in first_row.iter().enumerate() {
2048 let (data_type, nullable) = match value {
2049 PlanValue::Integer(_) => (DataType::Int64, false),
2050 PlanValue::Float(_) => (DataType::Float64, false),
2051 PlanValue::String(_) => (DataType::Utf8, false),
2052 PlanValue::Null => (DataType::Utf8, true), _ => {
2054 return Err(Error::InvalidArgumentError(format!(
2055 "unsupported value type in VALUES for column '{}'",
2056 column_names.get(idx).unwrap_or(&format!("column{}", idx))
2057 )));
2058 }
2059 };
2060
2061 column_types.push(data_type.clone());
2062 fields.push(Field::new(&column_names[idx], data_type, nullable));
2063 }
2064
2065 let schema = Arc::new(Schema::new(fields));
2066
2067 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(num_cols);
2069
2070 for col_idx in 0..num_cols {
2071 let col_type = &column_types[col_idx];
2072
2073 match col_type {
2074 DataType::Int64 => {
2075 let mut builder = Int64Builder::with_capacity(rows.len());
2076 for row in &rows {
2077 match &row[col_idx] {
2078 PlanValue::Integer(v) => builder.append_value(*v),
2079 PlanValue::Null => builder.append_null(),
2080 other => {
2081 return Err(Error::InvalidArgumentError(format!(
2082 "type mismatch in VALUES: expected Integer, got {:?}",
2083 other
2084 )));
2085 }
2086 }
2087 }
2088 arrays.push(Arc::new(builder.finish()) as ArrayRef);
2089 }
2090 DataType::Float64 => {
2091 let mut builder = Float64Builder::with_capacity(rows.len());
2092 for row in &rows {
2093 match &row[col_idx] {
2094 PlanValue::Float(v) => builder.append_value(*v),
2095 PlanValue::Null => builder.append_null(),
2096 other => {
2097 return Err(Error::InvalidArgumentError(format!(
2098 "type mismatch in VALUES: expected Float, got {:?}",
2099 other
2100 )));
2101 }
2102 }
2103 }
2104 arrays.push(Arc::new(builder.finish()) as ArrayRef);
2105 }
2106 DataType::Utf8 => {
2107 let mut builder = StringBuilder::with_capacity(rows.len(), 1024);
2108 for row in &rows {
2109 match &row[col_idx] {
2110 PlanValue::String(v) => builder.append_value(v),
2111 PlanValue::Null => builder.append_null(),
2112 other => {
2113 return Err(Error::InvalidArgumentError(format!(
2114 "type mismatch in VALUES: expected String, got {:?}",
2115 other
2116 )));
2117 }
2118 }
2119 }
2120 arrays.push(Arc::new(builder.finish()) as ArrayRef);
2121 }
2122 other => {
2123 return Err(Error::InvalidArgumentError(format!(
2124 "unsupported column type in VALUES: {:?}",
2125 other
2126 )));
2127 }
2128 }
2129 }
2130
2131 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays).map_err(|e| {
2132 Error::Internal(format!("failed to create RecordBatch from VALUES: {}", e))
2133 })?;
2134
2135 let plan = CreateTablePlan {
2136 name: display_name.clone(),
2137 if_not_exists,
2138 or_replace,
2139 columns: Vec::new(),
2140 source: Some(CreateTableSource::Batches {
2141 schema: Arc::clone(&schema),
2142 batches: vec![batch],
2143 }),
2144 namespace,
2145 foreign_keys: Vec::new(),
2146 multi_column_uniques: Vec::new(),
2147 };
2148
2149 self.execute_plan_statement(PlanStatement::CreateTable(plan))
2150 }
2151
2152 fn handle_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<RuntimeStatementResult<P>> {
2153 let table_name_debug =
2154 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
2155 tracing::trace!(
2156 "DEBUG SQL handle_insert called for table={}",
2157 table_name_debug
2158 );
2159 if !self.engine.session().has_active_transaction()
2160 && self.is_table_marked_dropped(&table_name_debug)?
2161 {
2162 return Err(Error::TransactionContextError(
2163 DROPPED_TABLE_TRANSACTION_ERR.into(),
2164 ));
2165 }
2166 if stmt.replace_into || stmt.ignore || stmt.or.is_some() {
2167 return Err(Error::InvalidArgumentError(
2168 "non-standard INSERT forms are not supported".into(),
2169 ));
2170 }
2171 if stmt.overwrite {
2172 return Err(Error::InvalidArgumentError(
2173 "INSERT OVERWRITE is not supported".into(),
2174 ));
2175 }
2176 if !stmt.assignments.is_empty() {
2177 return Err(Error::InvalidArgumentError(
2178 "INSERT ... SET is not supported".into(),
2179 ));
2180 }
2181 if stmt.partitioned.is_some() || !stmt.after_columns.is_empty() {
2182 return Err(Error::InvalidArgumentError(
2183 "partitioned INSERT is not supported".into(),
2184 ));
2185 }
2186 if stmt.returning.is_some() {
2187 return Err(Error::InvalidArgumentError(
2188 "INSERT ... RETURNING is not supported".into(),
2189 ));
2190 }
2191 if stmt.format_clause.is_some() || stmt.settings.is_some() {
2192 return Err(Error::InvalidArgumentError(
2193 "INSERT with FORMAT or SETTINGS is not supported".into(),
2194 ));
2195 }
2196
2197 let (display_name, _canonical_name) = match &stmt.table {
2198 TableObject::TableName(name) => canonical_object_name(name)?,
2199 _ => {
2200 return Err(Error::InvalidArgumentError(
2201 "INSERT requires a plain table name".into(),
2202 ));
2203 }
2204 };
2205
2206 let columns: Vec<String> = stmt
2207 .columns
2208 .iter()
2209 .map(|ident| ident.value.clone())
2210 .collect();
2211 let source_expr = stmt
2212 .source
2213 .as_ref()
2214 .ok_or_else(|| Error::InvalidArgumentError("INSERT requires a VALUES clause".into()))?;
2215 validate_simple_query(source_expr)?;
2216
2217 let insert_source = match source_expr.body.as_ref() {
2218 SetExpr::Values(values) => {
2219 if values.rows.is_empty() {
2220 return Err(Error::InvalidArgumentError(
2221 "INSERT VALUES list must contain at least one row".into(),
2222 ));
2223 }
2224 let mut rows: Vec<Vec<SqlValue>> = Vec::with_capacity(values.rows.len());
2225 for row in &values.rows {
2226 let mut converted = Vec::with_capacity(row.len());
2227 for expr in row {
2228 converted.push(SqlValue::try_from_expr(expr)?);
2229 }
2230 rows.push(converted);
2231 }
2232 InsertSource::Rows(
2233 rows.into_iter()
2234 .map(|row| row.into_iter().map(PlanValue::from).collect())
2235 .collect(),
2236 )
2237 }
2238 SetExpr::Select(select) => {
2239 if let Some(rows) = extract_constant_select_rows(select.as_ref())? {
2240 InsertSource::Rows(rows)
2241 } else if let Some(range_rows) = extract_rows_from_range(select.as_ref())? {
2242 InsertSource::Rows(range_rows.into_rows())
2243 } else {
2244 let select_plan = self.build_select_plan((**source_expr).clone())?;
2245 InsertSource::Select {
2246 plan: Box::new(select_plan),
2247 }
2248 }
2249 }
2250 _ => {
2251 return Err(Error::InvalidArgumentError(
2252 "unsupported INSERT source".into(),
2253 ));
2254 }
2255 };
2256
2257 let plan = InsertPlan {
2258 table: display_name.clone(),
2259 columns,
2260 source: insert_source,
2261 };
2262 tracing::trace!(
2263 "DEBUG SQL handle_insert: about to execute insert for table={}",
2264 display_name
2265 );
2266 self.execute_plan_statement(PlanStatement::Insert(plan))
2267 }
2268
2269 fn handle_update(
2270 &self,
2271 table: TableWithJoins,
2272 assignments: Vec<Assignment>,
2273 from: Option<UpdateTableFromKind>,
2274 selection: Option<SqlExpr>,
2275 returning: Option<Vec<SelectItem>>,
2276 ) -> SqlResult<RuntimeStatementResult<P>> {
2277 if from.is_some() {
2278 return Err(Error::InvalidArgumentError(
2279 "UPDATE ... FROM is not supported yet".into(),
2280 ));
2281 }
2282 if returning.is_some() {
2283 return Err(Error::InvalidArgumentError(
2284 "UPDATE ... RETURNING is not supported".into(),
2285 ));
2286 }
2287 if assignments.is_empty() {
2288 return Err(Error::InvalidArgumentError(
2289 "UPDATE requires at least one assignment".into(),
2290 ));
2291 }
2292
2293 let (display_name, canonical_name) = extract_single_table(std::slice::from_ref(&table))?;
2294
2295 if !self.engine.session().has_active_transaction()
2296 && self
2297 .engine
2298 .context()
2299 .is_table_marked_dropped(&canonical_name)
2300 {
2301 return Err(Error::TransactionContextError(
2302 DROPPED_TABLE_TRANSACTION_ERR.into(),
2303 ));
2304 }
2305
2306 let catalog = self.engine.context().table_catalog();
2307 let resolver = catalog.identifier_resolver();
2308 let table_id = catalog.table_id(&canonical_name);
2309
2310 let mut column_assignments = Vec::with_capacity(assignments.len());
2311 let mut seen: HashMap<String, ()> = HashMap::new();
2312 for assignment in assignments {
2313 let column_name = resolve_assignment_column_name(&assignment.target)?;
2314 let normalized = column_name.to_ascii_lowercase();
2315 if seen.insert(normalized, ()).is_some() {
2316 return Err(Error::InvalidArgumentError(format!(
2317 "duplicate column '{}' in UPDATE assignments",
2318 column_name
2319 )));
2320 }
2321 let value = match SqlValue::try_from_expr(&assignment.value) {
2322 Ok(literal) => AssignmentValue::Literal(PlanValue::from(literal)),
2323 Err(Error::InvalidArgumentError(msg))
2324 if msg.contains("unsupported literal expression") =>
2325 {
2326 let translated = translate_scalar_with_context(
2327 &resolver,
2328 IdentifierContext::new(table_id),
2329 &assignment.value,
2330 )?;
2331 AssignmentValue::Expression(translated)
2332 }
2333 Err(err) => return Err(err),
2334 };
2335 column_assignments.push(ColumnAssignment {
2336 column: column_name,
2337 value,
2338 });
2339 }
2340
2341 let filter = match selection {
2342 Some(expr) => Some(translate_condition_with_context(
2343 &resolver,
2344 IdentifierContext::new(table_id),
2345 &expr,
2346 )?),
2347 None => None,
2348 };
2349
2350 let plan = UpdatePlan {
2351 table: display_name.clone(),
2352 assignments: column_assignments,
2353 filter,
2354 };
2355 self.execute_plan_statement(PlanStatement::Update(plan))
2356 }
2357
2358 #[allow(clippy::collapsible_if)]
2359 fn handle_delete(&self, delete: Delete) -> SqlResult<RuntimeStatementResult<P>> {
2360 let Delete {
2361 tables,
2362 from,
2363 using,
2364 selection,
2365 returning,
2366 order_by,
2367 limit,
2368 } = delete;
2369
2370 if !tables.is_empty() {
2371 return Err(Error::InvalidArgumentError(
2372 "multi-table DELETE is not supported yet".into(),
2373 ));
2374 }
2375 if let Some(using_tables) = using {
2376 if !using_tables.is_empty() {
2377 return Err(Error::InvalidArgumentError(
2378 "DELETE ... USING is not supported yet".into(),
2379 ));
2380 }
2381 }
2382 if returning.is_some() {
2383 return Err(Error::InvalidArgumentError(
2384 "DELETE ... RETURNING is not supported".into(),
2385 ));
2386 }
2387 if !order_by.is_empty() {
2388 return Err(Error::InvalidArgumentError(
2389 "DELETE ... ORDER BY is not supported yet".into(),
2390 ));
2391 }
2392 if limit.is_some() {
2393 return Err(Error::InvalidArgumentError(
2394 "DELETE ... LIMIT is not supported yet".into(),
2395 ));
2396 }
2397
2398 let from_tables = match from {
2399 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
2400 };
2401 let (display_name, canonical_name) = extract_single_table(&from_tables)?;
2402
2403 if !self.engine.session().has_active_transaction()
2404 && self
2405 .engine
2406 .context()
2407 .is_table_marked_dropped(&canonical_name)
2408 {
2409 return Err(Error::TransactionContextError(
2410 DROPPED_TABLE_TRANSACTION_ERR.into(),
2411 ));
2412 }
2413
2414 let catalog = self.engine.context().table_catalog();
2415 let resolver = catalog.identifier_resolver();
2416 let table_id = catalog.table_id(&canonical_name);
2417
2418 let filter = selection
2419 .map(|expr| {
2420 translate_condition_with_context(&resolver, IdentifierContext::new(table_id), &expr)
2421 })
2422 .transpose()?;
2423
2424 let plan = DeletePlan {
2425 table: display_name.clone(),
2426 filter,
2427 };
2428 self.execute_plan_statement(PlanStatement::Delete(plan))
2429 }
2430
2431 #[allow(clippy::too_many_arguments)] fn handle_drop(
2433 &self,
2434 object_type: ObjectType,
2435 if_exists: bool,
2436 names: Vec<ObjectName>,
2437 cascade: bool,
2438 restrict: bool,
2439 purge: bool,
2440 temporary: bool,
2441 ) -> SqlResult<RuntimeStatementResult<P>> {
2442 if purge || temporary {
2443 return Err(Error::InvalidArgumentError(
2444 "DROP purge/temporary options are not supported".into(),
2445 ));
2446 }
2447
2448 match object_type {
2449 ObjectType::Table => {
2450 if cascade || restrict {
2451 return Err(Error::InvalidArgumentError(
2452 "DROP TABLE CASCADE/RESTRICT is not supported".into(),
2453 ));
2454 }
2455
2456 for name in names {
2457 let table_name = Self::object_name_to_string(&name)?;
2458 let mut plan = llkv_plan::DropTablePlan::new(table_name.clone());
2459 plan.if_exists = if_exists;
2460
2461 self.execute_plan_statement(llkv_plan::PlanStatement::DropTable(plan))
2462 .map_err(|err| Self::map_table_error(&table_name, err))?;
2463 }
2464
2465 Ok(RuntimeStatementResult::NoOp)
2466 }
2467 ObjectType::Index => {
2468 if cascade || restrict {
2469 return Err(Error::InvalidArgumentError(
2470 "DROP INDEX CASCADE/RESTRICT is not supported".into(),
2471 ));
2472 }
2473
2474 for name in names {
2475 let index_name = Self::object_name_to_string(&name)?;
2476 let plan = llkv_plan::DropIndexPlan::new(index_name).if_exists(if_exists);
2477 self.execute_plan_statement(llkv_plan::PlanStatement::DropIndex(plan))?;
2478 }
2479
2480 Ok(RuntimeStatementResult::NoOp)
2481 }
2482 ObjectType::Schema => {
2483 if restrict {
2484 return Err(Error::InvalidArgumentError(
2485 "DROP SCHEMA RESTRICT is not supported".into(),
2486 ));
2487 }
2488
2489 let catalog = self.engine.context().table_catalog();
2490
2491 for name in names {
2492 let (display_name, canonical_name) = canonical_object_name(&name)?;
2493
2494 if !catalog.schema_exists(&canonical_name) {
2495 if if_exists {
2496 continue;
2497 }
2498 return Err(Error::CatalogError(format!(
2499 "Schema '{}' does not exist",
2500 display_name
2501 )));
2502 }
2503
2504 if cascade {
2505 let all_tables = catalog.table_names();
2507 let schema_prefix = format!("{}.", canonical_name);
2508
2509 for table in all_tables {
2510 if table.to_ascii_lowercase().starts_with(&schema_prefix) {
2511 let mut plan = llkv_plan::DropTablePlan::new(table.clone());
2512 plan.if_exists = false;
2513 self.execute_plan_statement(llkv_plan::PlanStatement::DropTable(
2514 plan,
2515 ))?;
2516 }
2517 }
2518 } else {
2519 let all_tables = catalog.table_names();
2521 let schema_prefix = format!("{}.", canonical_name);
2522 let has_tables = all_tables
2523 .iter()
2524 .any(|t| t.to_ascii_lowercase().starts_with(&schema_prefix));
2525
2526 if has_tables {
2527 return Err(Error::CatalogError(format!(
2528 "Schema '{}' is not empty. Use CASCADE to drop schema and all its tables",
2529 display_name
2530 )));
2531 }
2532 }
2533
2534 if !catalog.unregister_schema(&canonical_name) && !if_exists {
2536 return Err(Error::CatalogError(format!(
2537 "Schema '{}' does not exist",
2538 display_name
2539 )));
2540 }
2541 }
2542
2543 Ok(RuntimeStatementResult::NoOp)
2544 }
2545 _ => Err(Error::InvalidArgumentError(format!(
2546 "DROP {} is not supported",
2547 object_type
2548 ))),
2549 }
2550 }
2551
2552 fn handle_alter_table(
2553 &self,
2554 name: ObjectName,
2555 if_exists: bool,
2556 only: bool,
2557 operations: Vec<AlterTableOperation>,
2558 ) -> SqlResult<RuntimeStatementResult<P>> {
2559 if only {
2560 return Err(Error::InvalidArgumentError(
2561 "ALTER TABLE ONLY is not supported yet".into(),
2562 ));
2563 }
2564
2565 if operations.is_empty() {
2566 return Ok(RuntimeStatementResult::NoOp);
2567 }
2568
2569 if operations.len() != 1 {
2570 return Err(Error::InvalidArgumentError(
2571 "ALTER TABLE currently supports exactly one operation".into(),
2572 ));
2573 }
2574
2575 let operation = operations.into_iter().next().expect("checked length");
2576 match operation {
2577 AlterTableOperation::RenameTable { table_name } => {
2578 let new_name = table_name.to_string();
2579 self.handle_alter_table_rename(name, new_name, if_exists)
2580 }
2581 AlterTableOperation::RenameColumn {
2582 old_column_name,
2583 new_column_name,
2584 } => {
2585 let plan = llkv_plan::AlterTablePlan {
2586 table_name: name.to_string(),
2587 if_exists,
2588 operation: llkv_plan::AlterTableOperation::RenameColumn {
2589 old_column_name: old_column_name.to_string(),
2590 new_column_name: new_column_name.to_string(),
2591 },
2592 };
2593 self.execute_plan_statement(PlanStatement::AlterTable(plan))
2594 }
2595 AlterTableOperation::AlterColumn { column_name, op } => {
2596 if let AlterColumnOperation::SetDataType {
2598 data_type,
2599 using,
2600 had_set: _,
2601 } = op
2602 {
2603 if using.is_some() {
2604 return Err(Error::InvalidArgumentError(
2605 "ALTER COLUMN SET DATA TYPE USING clause is not yet supported".into(),
2606 ));
2607 }
2608
2609 let plan = llkv_plan::AlterTablePlan {
2610 table_name: name.to_string(),
2611 if_exists,
2612 operation: llkv_plan::AlterTableOperation::SetColumnDataType {
2613 column_name: column_name.to_string(),
2614 new_data_type: data_type.to_string(),
2615 },
2616 };
2617 self.execute_plan_statement(PlanStatement::AlterTable(plan))
2618 } else {
2619 Err(Error::InvalidArgumentError(format!(
2620 "unsupported ALTER COLUMN operation: {:?}",
2621 op
2622 )))
2623 }
2624 }
2625 AlterTableOperation::DropColumn {
2626 has_column_keyword: _,
2627 column_names,
2628 if_exists: column_if_exists,
2629 drop_behavior,
2630 } => {
2631 if column_names.len() != 1 {
2632 return Err(Error::InvalidArgumentError(
2633 "DROP COLUMN currently supports dropping one column at a time".into(),
2634 ));
2635 }
2636
2637 let column_name = column_names.into_iter().next().unwrap().to_string();
2638 let cascade = matches!(drop_behavior, Some(sqlparser::ast::DropBehavior::Cascade));
2639
2640 let plan = llkv_plan::AlterTablePlan {
2641 table_name: name.to_string(),
2642 if_exists,
2643 operation: llkv_plan::AlterTableOperation::DropColumn {
2644 column_name,
2645 if_exists: column_if_exists,
2646 cascade,
2647 },
2648 };
2649 self.execute_plan_statement(PlanStatement::AlterTable(plan))
2650 }
2651 other => Err(Error::InvalidArgumentError(format!(
2652 "unsupported ALTER TABLE operation: {:?}",
2653 other
2654 ))),
2655 }
2656 }
2657
2658 fn handle_alter_table_rename(
2659 &self,
2660 original_name: ObjectName,
2661 new_table_name: String,
2662 if_exists: bool,
2663 ) -> SqlResult<RuntimeStatementResult<P>> {
2664 let (schema_opt, table_name) = parse_schema_qualified_name(&original_name)?;
2665
2666 let new_table_name_clean = new_table_name.trim();
2667
2668 if new_table_name_clean.is_empty() {
2669 return Err(Error::InvalidArgumentError(
2670 "ALTER TABLE RENAME requires a non-empty table name".into(),
2671 ));
2672 }
2673
2674 let (raw_new_schema_opt, raw_new_table) =
2675 if let Some((schema_part, table_part)) = new_table_name_clean.split_once('.') {
2676 (
2677 Some(schema_part.trim().to_string()),
2678 table_part.trim().to_string(),
2679 )
2680 } else {
2681 (None, new_table_name_clean.to_string())
2682 };
2683
2684 if schema_opt.is_none() && raw_new_schema_opt.is_some() {
2685 return Err(Error::InvalidArgumentError(
2686 "ALTER TABLE RENAME cannot add a schema qualifier".into(),
2687 ));
2688 }
2689
2690 let new_table_trimmed = raw_new_table.trim_matches('"');
2691 if new_table_trimmed.is_empty() {
2692 return Err(Error::InvalidArgumentError(
2693 "ALTER TABLE RENAME requires a non-empty table name".into(),
2694 ));
2695 }
2696
2697 if let (Some(existing_schema), Some(new_schema_raw)) =
2698 (schema_opt.as_ref(), raw_new_schema_opt.as_ref())
2699 {
2700 let new_schema_trimmed = new_schema_raw.trim_matches('"');
2701 if !existing_schema.eq_ignore_ascii_case(new_schema_trimmed) {
2702 return Err(Error::InvalidArgumentError(
2703 "ALTER TABLE RENAME cannot change table schema".into(),
2704 ));
2705 }
2706 }
2707
2708 let new_table_display = raw_new_table;
2709 let new_schema_opt = raw_new_schema_opt;
2710
2711 fn join_schema_table(schema: &str, table: &str) -> String {
2712 let mut qualified = String::with_capacity(schema.len() + table.len() + 1);
2713 qualified.push_str(schema);
2714 qualified.push('.');
2715 qualified.push_str(table);
2716 qualified
2717 }
2718
2719 let current_display = schema_opt
2720 .as_ref()
2721 .map(|schema| join_schema_table(schema, &table_name))
2722 .unwrap_or_else(|| table_name.clone());
2723
2724 let new_display = if let Some(new_schema_raw) = new_schema_opt.clone() {
2725 join_schema_table(&new_schema_raw, &new_table_display)
2726 } else if let Some(schema) = schema_opt.as_ref() {
2727 join_schema_table(schema, &new_table_display)
2728 } else {
2729 new_table_display.clone()
2730 };
2731
2732 let plan = RenameTablePlan::new(¤t_display, &new_display).if_exists(if_exists);
2733
2734 match CatalogDdl::rename_table(self.engine.session(), plan) {
2735 Ok(()) => Ok(RuntimeStatementResult::NoOp),
2736 Err(err) => Err(Self::map_table_error(¤t_display, err)),
2737 }
2738 }
2739
2740 fn handle_query(&self, query: Query) -> SqlResult<RuntimeStatementResult<P>> {
2741 if let Some(result) = self.try_handle_pragma_table_info(&query)? {
2743 return Ok(result);
2744 }
2745
2746 let select_plan = self.build_select_plan(query)?;
2747 self.execute_plan_statement(PlanStatement::Select(select_plan))
2748 }
2749
2750 fn build_select_plan(&self, query: Query) -> SqlResult<SelectPlan> {
2751 if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
2752 return Err(Error::TransactionContextError(
2753 "TransactionContext Error: transaction is aborted".into(),
2754 ));
2755 }
2756
2757 validate_simple_query(&query)?;
2758 let catalog = self.engine.context().table_catalog();
2759 let resolver = catalog.identifier_resolver();
2760
2761 let (mut select_plan, select_context) = match query.body.as_ref() {
2762 SetExpr::Select(select) => self.translate_select(select.as_ref(), &resolver)?,
2763 other => {
2764 return Err(Error::InvalidArgumentError(format!(
2765 "unsupported query expression: {other:?}"
2766 )));
2767 }
2768 };
2769 if let Some(order_by) = &query.order_by {
2770 if !select_plan.aggregates.is_empty() {
2771 return Err(Error::InvalidArgumentError(
2772 "ORDER BY is not supported for aggregate queries".into(),
2773 ));
2774 }
2775 let order_plan = self.translate_order_by(&resolver, select_context, order_by)?;
2776 select_plan = select_plan.with_order_by(order_plan);
2777 }
2778 Ok(select_plan)
2779 }
2780
2781 fn translate_select(
2782 &self,
2783 select: &Select,
2784 resolver: &IdentifierResolver<'_>,
2785 ) -> SqlResult<(SelectPlan, IdentifierContext)> {
2786 if select.distinct.is_some() {
2787 return Err(Error::InvalidArgumentError(
2788 "SELECT DISTINCT is not supported".into(),
2789 ));
2790 }
2791 if select.top.is_some() {
2792 return Err(Error::InvalidArgumentError(
2793 "SELECT TOP is not supported".into(),
2794 ));
2795 }
2796 if select.exclude.is_some() {
2797 return Err(Error::InvalidArgumentError(
2798 "SELECT EXCLUDE is not supported".into(),
2799 ));
2800 }
2801 if select.into.is_some() {
2802 return Err(Error::InvalidArgumentError(
2803 "SELECT INTO is not supported".into(),
2804 ));
2805 }
2806 if !select.lateral_views.is_empty() {
2807 return Err(Error::InvalidArgumentError(
2808 "LATERAL VIEW is not supported".into(),
2809 ));
2810 }
2811 if select.prewhere.is_some() {
2812 return Err(Error::InvalidArgumentError(
2813 "PREWHERE is not supported".into(),
2814 ));
2815 }
2816 if !group_by_is_empty(&select.group_by) || select.value_table_mode.is_some() {
2817 return Err(Error::InvalidArgumentError(
2818 "GROUP BY and SELECT AS VALUE/STRUCT are not supported".into(),
2819 ));
2820 }
2821 if !select.cluster_by.is_empty()
2822 || !select.distribute_by.is_empty()
2823 || !select.sort_by.is_empty()
2824 {
2825 return Err(Error::InvalidArgumentError(
2826 "CLUSTER/DISTRIBUTE/SORT BY clauses are not supported".into(),
2827 ));
2828 }
2829 if select.having.is_some()
2830 || !select.named_window.is_empty()
2831 || select.qualify.is_some()
2832 || select.connect_by.is_some()
2833 {
2834 return Err(Error::InvalidArgumentError(
2835 "advanced SELECT clauses are not supported".into(),
2836 ));
2837 }
2838
2839 let table_alias = select
2840 .from
2841 .first()
2842 .and_then(|table_with_joins| match &table_with_joins.relation {
2843 TableFactor::Table { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2844 _ => None,
2845 });
2846
2847 if let Some(alias) = table_alias.as_ref() {
2848 validate_projection_alias_qualifiers(&select.projection, alias)?;
2849 }
2850 let catalog = self.engine.context().table_catalog();
2852 let (mut plan, id_context) = if select.from.is_empty() {
2853 let mut p = SelectPlan::new("");
2855 let projections = self.build_projection_list(
2856 resolver,
2857 IdentifierContext::new(None),
2858 &select.projection,
2859 )?;
2860 p = p.with_projections(projections);
2861 (p, IdentifierContext::new(None))
2862 } else if select.from.len() == 1 {
2863 let (display_name, canonical_name) = extract_single_table(&select.from)?;
2865 let table_id = catalog.table_id(&canonical_name);
2866 let mut p = SelectPlan::new(display_name.clone());
2867 if let Some(aggregates) = self.detect_simple_aggregates(&select.projection)? {
2868 p = p.with_aggregates(aggregates);
2869 } else {
2870 let projections = self.build_projection_list(
2871 resolver,
2872 IdentifierContext::new(table_id),
2873 &select.projection,
2874 )?;
2875 p = p.with_projections(projections);
2876 }
2877 (p, IdentifierContext::new(table_id))
2878 } else {
2879 let tables = extract_tables(&select.from)?;
2881 let mut p = SelectPlan::with_tables(tables);
2882 let projections = self.build_projection_list(
2885 resolver,
2886 IdentifierContext::new(None),
2887 &select.projection,
2888 )?;
2889 p = p.with_projections(projections);
2890 (p, IdentifierContext::new(None))
2891 };
2892
2893 let filter_expr = match &select.selection {
2894 Some(expr) => Some(translate_condition_with_context(
2895 resolver, id_context, expr,
2896 )?),
2897 None => None,
2898 };
2899 plan = plan.with_filter(filter_expr);
2900 Ok((plan, id_context))
2901 }
2902
2903 fn translate_order_by(
2904 &self,
2905 resolver: &IdentifierResolver<'_>,
2906 id_context: IdentifierContext,
2907 order_by: &OrderBy,
2908 ) -> SqlResult<Vec<OrderByPlan>> {
2909 let exprs = match &order_by.kind {
2910 OrderByKind::Expressions(exprs) => exprs,
2911 _ => {
2912 return Err(Error::InvalidArgumentError(
2913 "unsupported ORDER BY clause".into(),
2914 ));
2915 }
2916 };
2917
2918 let base_nulls_first = self.default_nulls_first.load(AtomicOrdering::Relaxed);
2919
2920 let resolve_simple_column = |expr: &SqlExpr| -> SqlResult<String> {
2921 let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2922 match scalar {
2923 llkv_expr::expr::ScalarExpr::Column(column) => Ok(column),
2924 other => Err(Error::InvalidArgumentError(format!(
2925 "ORDER BY expression must reference a simple column, found {other:?}"
2926 ))),
2927 }
2928 };
2929
2930 let mut plans = Vec::with_capacity(exprs.len());
2931 for order_expr in exprs {
2932 let ascending = order_expr.options.asc.unwrap_or(true);
2933 let default_nulls_first_for_direction = if ascending {
2934 base_nulls_first
2935 } else {
2936 !base_nulls_first
2937 };
2938 let nulls_first = order_expr
2939 .options
2940 .nulls_first
2941 .unwrap_or(default_nulls_first_for_direction);
2942
2943 if let SqlExpr::Identifier(ident) = &order_expr.expr
2944 && ident.value.eq_ignore_ascii_case("ALL")
2945 && ident.quote_style.is_none()
2946 {
2947 plans.push(OrderByPlan {
2948 target: OrderTarget::All,
2949 sort_type: OrderSortType::Native,
2950 ascending,
2951 nulls_first,
2952 });
2953 continue;
2954 }
2955
2956 let (target, sort_type) = match &order_expr.expr {
2957 SqlExpr::Identifier(_) | SqlExpr::CompoundIdentifier(_) => (
2958 OrderTarget::Column(resolve_simple_column(&order_expr.expr)?),
2959 OrderSortType::Native,
2960 ),
2961 SqlExpr::Cast {
2962 expr,
2963 data_type:
2964 SqlDataType::Int(_)
2965 | SqlDataType::Integer(_)
2966 | SqlDataType::BigInt(_)
2967 | SqlDataType::SmallInt(_)
2968 | SqlDataType::TinyInt(_),
2969 ..
2970 } => (
2971 OrderTarget::Column(resolve_simple_column(expr)?),
2972 OrderSortType::CastTextToInteger,
2973 ),
2974 SqlExpr::Cast { data_type, .. } => {
2975 return Err(Error::InvalidArgumentError(format!(
2976 "ORDER BY CAST target type {:?} is not supported",
2977 data_type
2978 )));
2979 }
2980 SqlExpr::Value(value_with_span) => match &value_with_span.value {
2981 Value::Number(raw, _) => {
2982 let position: usize = raw.parse().map_err(|_| {
2983 Error::InvalidArgumentError(format!(
2984 "ORDER BY position '{}' is not a valid positive integer",
2985 raw
2986 ))
2987 })?;
2988 if position == 0 {
2989 return Err(Error::InvalidArgumentError(
2990 "ORDER BY position must be at least 1".into(),
2991 ));
2992 }
2993 (OrderTarget::Index(position - 1), OrderSortType::Native)
2994 }
2995 other => {
2996 return Err(Error::InvalidArgumentError(format!(
2997 "unsupported ORDER BY literal expression: {other:?}"
2998 )));
2999 }
3000 },
3001 other => {
3002 return Err(Error::InvalidArgumentError(format!(
3003 "unsupported ORDER BY expression: {other:?}"
3004 )));
3005 }
3006 };
3007
3008 plans.push(OrderByPlan {
3009 target,
3010 sort_type,
3011 ascending,
3012 nulls_first,
3013 });
3014 }
3015
3016 Ok(plans)
3017 }
3018
3019 fn detect_simple_aggregates(
3020 &self,
3021 projection_items: &[SelectItem],
3022 ) -> SqlResult<Option<Vec<AggregateExpr>>> {
3023 if projection_items.is_empty() {
3024 return Ok(None);
3025 }
3026
3027 let mut specs: Vec<AggregateExpr> = Vec::with_capacity(projection_items.len());
3028 for (idx, item) in projection_items.iter().enumerate() {
3029 let (expr, alias_opt) = match item {
3030 SelectItem::UnnamedExpr(expr) => (expr, None),
3031 SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())),
3032 _ => return Ok(None),
3033 };
3034
3035 let alias = alias_opt.unwrap_or_else(|| format!("col{}", idx + 1));
3036 let SqlExpr::Function(func) = expr else {
3037 return Ok(None);
3038 };
3039
3040 if func.uses_odbc_syntax {
3041 return Err(Error::InvalidArgumentError(
3042 "ODBC function syntax is not supported in aggregate queries".into(),
3043 ));
3044 }
3045 if !matches!(func.parameters, FunctionArguments::None) {
3046 return Err(Error::InvalidArgumentError(
3047 "parameterized aggregate functions are not supported".into(),
3048 ));
3049 }
3050 if func.filter.is_some()
3051 || func.null_treatment.is_some()
3052 || func.over.is_some()
3053 || !func.within_group.is_empty()
3054 {
3055 return Err(Error::InvalidArgumentError(
3056 "advanced aggregate clauses are not supported".into(),
3057 ));
3058 }
3059
3060 let mut is_distinct = false;
3061 let args_slice: &[FunctionArg] = match &func.args {
3062 FunctionArguments::List(list) => {
3063 if let Some(dup) = &list.duplicate_treatment {
3064 use sqlparser::ast::DuplicateTreatment;
3065 match dup {
3066 DuplicateTreatment::All => {}
3067 DuplicateTreatment::Distinct => is_distinct = true,
3068 }
3069 }
3070 if !list.clauses.is_empty() {
3071 return Err(Error::InvalidArgumentError(
3072 "aggregate argument clauses are not supported".into(),
3073 ));
3074 }
3075 &list.args
3076 }
3077 FunctionArguments::None => &[],
3078 FunctionArguments::Subquery(_) => {
3079 return Err(Error::InvalidArgumentError(
3080 "aggregate subquery arguments are not supported".into(),
3081 ));
3082 }
3083 };
3084
3085 let func_name = if func.name.0.len() == 1 {
3086 match &func.name.0[0] {
3087 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
3088 _ => {
3089 return Err(Error::InvalidArgumentError(
3090 "unsupported aggregate function name".into(),
3091 ));
3092 }
3093 }
3094 } else {
3095 return Err(Error::InvalidArgumentError(
3096 "qualified aggregate function names are not supported".into(),
3097 ));
3098 };
3099
3100 let aggregate = match func_name.as_str() {
3101 "count" => {
3102 if args_slice.len() != 1 {
3103 return Err(Error::InvalidArgumentError(
3104 "COUNT accepts exactly one argument".into(),
3105 ));
3106 }
3107 match &args_slice[0] {
3108 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
3109 if is_distinct {
3110 return Err(Error::InvalidArgumentError(
3111 "COUNT(DISTINCT *) is not supported".into(),
3112 ));
3113 }
3114 AggregateExpr::count_star(alias)
3115 }
3116 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
3117 let column = resolve_column_name(arg_expr)?;
3118 if is_distinct {
3119 AggregateExpr::count_distinct_column(column, alias)
3120 } else {
3121 AggregateExpr::count_column(column, alias)
3122 }
3123 }
3124 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
3125 return Err(Error::InvalidArgumentError(
3126 "named COUNT arguments are not supported".into(),
3127 ));
3128 }
3129 FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
3130 return Err(Error::InvalidArgumentError(
3131 "COUNT does not support qualified wildcards".into(),
3132 ));
3133 }
3134 }
3135 }
3136 "sum" | "min" | "max" => {
3137 if is_distinct {
3138 return Err(Error::InvalidArgumentError(
3139 "DISTINCT is not supported for this aggregate".into(),
3140 ));
3141 }
3142 if args_slice.len() != 1 {
3143 return Err(Error::InvalidArgumentError(format!(
3144 "{} accepts exactly one argument",
3145 func_name.to_uppercase()
3146 )));
3147 }
3148 let arg_expr = match &args_slice[0] {
3149 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => arg_expr,
3150 FunctionArg::Unnamed(FunctionArgExpr::Wildcard)
3151 | FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
3152 return Err(Error::InvalidArgumentError(format!(
3153 "{} does not support wildcard arguments",
3154 func_name.to_uppercase()
3155 )));
3156 }
3157 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
3158 return Err(Error::InvalidArgumentError(format!(
3159 "{} arguments must be column references",
3160 func_name.to_uppercase()
3161 )));
3162 }
3163 };
3164
3165 if func_name == "sum" {
3166 if let Some(column) = parse_count_nulls_case(arg_expr)? {
3167 AggregateExpr::count_nulls(column, alias)
3168 } else {
3169 let column = resolve_column_name(arg_expr)?;
3170 AggregateExpr::sum_int64(column, alias)
3171 }
3172 } else {
3173 let column = resolve_column_name(arg_expr)?;
3174 if func_name == "min" {
3175 AggregateExpr::min_int64(column, alias)
3176 } else {
3177 AggregateExpr::max_int64(column, alias)
3178 }
3179 }
3180 }
3181 _ => return Ok(None),
3182 };
3183
3184 specs.push(aggregate);
3185 }
3186
3187 if specs.is_empty() {
3188 return Ok(None);
3189 }
3190 Ok(Some(specs))
3191 }
3192
3193 fn build_projection_list(
3194 &self,
3195 resolver: &IdentifierResolver<'_>,
3196 id_context: IdentifierContext,
3197 projection_items: &[SelectItem],
3198 ) -> SqlResult<Vec<SelectProjection>> {
3199 if projection_items.is_empty() {
3200 return Err(Error::InvalidArgumentError(
3201 "SELECT projection must include at least one column".into(),
3202 ));
3203 }
3204
3205 let mut projections = Vec::with_capacity(projection_items.len());
3206 for (idx, item) in projection_items.iter().enumerate() {
3207 match item {
3208 SelectItem::Wildcard(options) => {
3209 if let Some(exclude) = &options.opt_exclude {
3210 use sqlparser::ast::ExcludeSelectItem;
3211 let exclude_cols = match exclude {
3212 ExcludeSelectItem::Single(ident) => vec![ident.value.clone()],
3213 ExcludeSelectItem::Multiple(idents) => {
3214 idents.iter().map(|id| id.value.clone()).collect()
3215 }
3216 };
3217 projections.push(SelectProjection::AllColumnsExcept {
3218 exclude: exclude_cols,
3219 });
3220 } else {
3221 projections.push(SelectProjection::AllColumns);
3222 }
3223 }
3224 SelectItem::QualifiedWildcard(kind, _) => match kind {
3225 SelectItemQualifiedWildcardKind::ObjectName(name) => {
3226 projections.push(SelectProjection::Column {
3227 name: name.to_string(),
3228 alias: None,
3229 });
3230 }
3231 SelectItemQualifiedWildcardKind::Expr(_) => {
3232 return Err(Error::InvalidArgumentError(
3233 "expression-qualified wildcards are not supported".into(),
3234 ));
3235 }
3236 },
3237 SelectItem::UnnamedExpr(expr) => match expr {
3238 SqlExpr::Identifier(ident) => {
3239 let parts = vec![ident.value.clone()];
3240 let resolution = resolver.resolve(&parts, id_context)?;
3241 if resolution.is_simple() {
3242 projections.push(SelectProjection::Column {
3243 name: resolution.column().to_string(),
3244 alias: None,
3245 });
3246 } else {
3247 let alias = format!("col{}", idx + 1);
3248 projections.push(SelectProjection::Computed {
3249 expr: resolution.into_scalar_expr(),
3250 alias,
3251 });
3252 }
3253 }
3254 SqlExpr::CompoundIdentifier(parts) => {
3255 let name_parts: Vec<String> =
3256 parts.iter().map(|part| part.value.clone()).collect();
3257 let resolution = resolver.resolve(&name_parts, id_context)?;
3258 if resolution.is_simple() {
3259 projections.push(SelectProjection::Column {
3260 name: resolution.column().to_string(),
3261 alias: None,
3262 });
3263 } else {
3264 let alias = format!("col{}", idx + 1);
3265 projections.push(SelectProjection::Computed {
3266 expr: resolution.into_scalar_expr(),
3267 alias,
3268 });
3269 }
3270 }
3271 _ => {
3272 let alias = format!("col{}", idx + 1);
3273 let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
3274 projections.push(SelectProjection::Computed {
3275 expr: scalar,
3276 alias,
3277 });
3278 }
3279 },
3280 SelectItem::ExprWithAlias { expr, alias } => match expr {
3281 SqlExpr::Identifier(ident) => {
3282 let parts = vec![ident.value.clone()];
3283 let resolution = resolver.resolve(&parts, id_context)?;
3284 if resolution.is_simple() {
3285 projections.push(SelectProjection::Column {
3286 name: resolution.column().to_string(),
3287 alias: Some(alias.value.clone()),
3288 });
3289 } else {
3290 projections.push(SelectProjection::Computed {
3291 expr: resolution.into_scalar_expr(),
3292 alias: alias.value.clone(),
3293 });
3294 }
3295 }
3296 SqlExpr::CompoundIdentifier(parts) => {
3297 let name_parts: Vec<String> =
3298 parts.iter().map(|part| part.value.clone()).collect();
3299 let resolution = resolver.resolve(&name_parts, id_context)?;
3300 if resolution.is_simple() {
3301 projections.push(SelectProjection::Column {
3302 name: resolution.column().to_string(),
3303 alias: Some(alias.value.clone()),
3304 });
3305 } else {
3306 projections.push(SelectProjection::Computed {
3307 expr: resolution.into_scalar_expr(),
3308 alias: alias.value.clone(),
3309 });
3310 }
3311 }
3312 _ => {
3313 let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
3314 projections.push(SelectProjection::Computed {
3315 expr: scalar,
3316 alias: alias.value.clone(),
3317 });
3318 }
3319 },
3320 }
3321 }
3322 Ok(projections)
3323 }
3324
3325 #[allow(clippy::too_many_arguments)] fn handle_start_transaction(
3327 &self,
3328 modes: Vec<TransactionMode>,
3329 begin: bool,
3330 transaction: Option<BeginTransactionKind>,
3331 modifier: Option<TransactionModifier>,
3332 statements: Vec<Statement>,
3333 exception: Option<Vec<ExceptionWhen>>,
3334 has_end_keyword: bool,
3335 ) -> SqlResult<RuntimeStatementResult<P>> {
3336 if !modes.is_empty() {
3337 return Err(Error::InvalidArgumentError(
3338 "transaction modes are not supported".into(),
3339 ));
3340 }
3341 if modifier.is_some() {
3342 return Err(Error::InvalidArgumentError(
3343 "transaction modifiers are not supported".into(),
3344 ));
3345 }
3346 if !statements.is_empty() || exception.is_some() || has_end_keyword {
3347 return Err(Error::InvalidArgumentError(
3348 "BEGIN blocks with inline statements or exceptions are not supported".into(),
3349 ));
3350 }
3351 if let Some(kind) = transaction {
3352 match kind {
3353 BeginTransactionKind::Transaction | BeginTransactionKind::Work => {}
3354 }
3355 }
3356 if !begin {
3357 tracing::warn!("Currently treat `START TRANSACTION` same as `BEGIN`")
3359 }
3360
3361 self.execute_plan_statement(PlanStatement::BeginTransaction)
3362 }
3363
3364 fn handle_commit(
3365 &self,
3366 chain: bool,
3367 end: bool,
3368 modifier: Option<TransactionModifier>,
3369 ) -> SqlResult<RuntimeStatementResult<P>> {
3370 if chain {
3371 return Err(Error::InvalidArgumentError(
3372 "COMMIT AND [NO] CHAIN is not supported".into(),
3373 ));
3374 }
3375 if end {
3376 return Err(Error::InvalidArgumentError(
3377 "END blocks are not supported".into(),
3378 ));
3379 }
3380 if modifier.is_some() {
3381 return Err(Error::InvalidArgumentError(
3382 "transaction modifiers are not supported".into(),
3383 ));
3384 }
3385
3386 self.execute_plan_statement(PlanStatement::CommitTransaction)
3387 }
3388
3389 fn handle_rollback(
3390 &self,
3391 chain: bool,
3392 savepoint: Option<Ident>,
3393 ) -> SqlResult<RuntimeStatementResult<P>> {
3394 if chain {
3395 return Err(Error::InvalidArgumentError(
3396 "ROLLBACK AND [NO] CHAIN is not supported".into(),
3397 ));
3398 }
3399 if savepoint.is_some() {
3400 return Err(Error::InvalidArgumentError(
3401 "ROLLBACK TO SAVEPOINT is not supported".into(),
3402 ));
3403 }
3404
3405 self.execute_plan_statement(PlanStatement::RollbackTransaction)
3406 }
3407
3408 fn handle_set(&self, set_stmt: Set) -> SqlResult<RuntimeStatementResult<P>> {
3409 match set_stmt {
3410 Set::SingleAssignment {
3411 scope,
3412 hivevar,
3413 variable,
3414 values,
3415 } => {
3416 if scope.is_some() || hivevar {
3417 return Err(Error::InvalidArgumentError(
3418 "SET modifiers are not supported".into(),
3419 ));
3420 }
3421
3422 let variable_name_raw = variable.to_string();
3423 let variable_name = variable_name_raw.to_ascii_lowercase();
3424
3425 match variable_name.as_str() {
3426 "default_null_order" => {
3427 if values.len() != 1 {
3428 return Err(Error::InvalidArgumentError(
3429 "SET default_null_order expects exactly one value".into(),
3430 ));
3431 }
3432
3433 let value_expr = &values[0];
3434 let normalized = match value_expr {
3435 SqlExpr::Value(value_with_span) => value_with_span
3436 .value
3437 .clone()
3438 .into_string()
3439 .map(|s| s.to_ascii_lowercase()),
3440 SqlExpr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
3441 _ => None,
3442 };
3443
3444 if !matches!(normalized.as_deref(), Some("nulls_first" | "nulls_last")) {
3445 return Err(Error::InvalidArgumentError(format!(
3446 "unsupported value for SET default_null_order: {value_expr:?}"
3447 )));
3448 }
3449
3450 let use_nulls_first = matches!(normalized.as_deref(), Some("nulls_first"));
3451 self.default_nulls_first
3452 .store(use_nulls_first, AtomicOrdering::Relaxed);
3453
3454 Ok(RuntimeStatementResult::NoOp)
3455 }
3456 "immediate_transaction_mode" => {
3457 if values.len() != 1 {
3458 return Err(Error::InvalidArgumentError(
3459 "SET immediate_transaction_mode expects exactly one value".into(),
3460 ));
3461 }
3462 let normalized = values[0].to_string().to_ascii_lowercase();
3463 let enabled = match normalized.as_str() {
3464 "true" | "on" | "1" => true,
3465 "false" | "off" | "0" => false,
3466 _ => {
3467 return Err(Error::InvalidArgumentError(format!(
3468 "unsupported value for SET immediate_transaction_mode: {}",
3469 values[0]
3470 )));
3471 }
3472 };
3473 if !enabled {
3474 tracing::warn!(
3475 "SET immediate_transaction_mode=false has no effect; continuing with auto mode"
3476 );
3477 }
3478 Ok(RuntimeStatementResult::NoOp)
3479 }
3480 _ => Err(Error::InvalidArgumentError(format!(
3481 "unsupported SET variable: {variable_name_raw}"
3482 ))),
3483 }
3484 }
3485 other => Err(Error::InvalidArgumentError(format!(
3486 "unsupported SQL SET statement: {other:?}",
3487 ))),
3488 }
3489 }
3490
3491 fn handle_pragma(
3492 &self,
3493 name: ObjectName,
3494 value: Option<Value>,
3495 is_eq: bool,
3496 ) -> SqlResult<RuntimeStatementResult<P>> {
3497 let (display, canonical) = canonical_object_name(&name)?;
3498 if value.is_some() || is_eq {
3499 return Err(Error::InvalidArgumentError(format!(
3500 "PRAGMA '{display}' does not accept a value"
3501 )));
3502 }
3503
3504 match canonical.as_str() {
3505 "enable_verification" | "disable_verification" => Ok(RuntimeStatementResult::NoOp),
3506 _ => Err(Error::InvalidArgumentError(format!(
3507 "unsupported PRAGMA '{}'",
3508 display
3509 ))),
3510 }
3511 }
3512}
3513
3514fn canonical_object_name(name: &ObjectName) -> SqlResult<(String, String)> {
3515 if name.0.is_empty() {
3516 return Err(Error::InvalidArgumentError(
3517 "object name must not be empty".into(),
3518 ));
3519 }
3520 let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
3521 for part in &name.0 {
3522 let ident = match part {
3523 ObjectNamePart::Identifier(ident) => ident,
3524 _ => {
3525 return Err(Error::InvalidArgumentError(
3526 "object names using functions are not supported".into(),
3527 ));
3528 }
3529 };
3530 parts.push(ident.value.clone());
3531 }
3532 let display = parts.join(".");
3533 let canonical = display.to_ascii_lowercase();
3534 Ok((display, canonical))
3535}
3536
3537fn parse_schema_qualified_name(name: &ObjectName) -> SqlResult<(Option<String>, String)> {
3546 if name.0.is_empty() {
3547 return Err(Error::InvalidArgumentError(
3548 "object name must not be empty".into(),
3549 ));
3550 }
3551
3552 let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
3553 for part in &name.0 {
3554 let ident = match part {
3555 ObjectNamePart::Identifier(ident) => ident,
3556 _ => {
3557 return Err(Error::InvalidArgumentError(
3558 "object names using functions are not supported".into(),
3559 ));
3560 }
3561 };
3562 parts.push(ident.value.clone());
3563 }
3564
3565 match parts.len() {
3566 1 => Ok((None, parts[0].clone())),
3567 2 => Ok((Some(parts[0].clone()), parts[1].clone())),
3568 _ => Err(Error::InvalidArgumentError(format!(
3569 "table name has too many parts: {}",
3570 name
3571 ))),
3572 }
3573}
3574
3575fn extract_index_column_name(
3589 index_col: &sqlparser::ast::IndexColumn,
3590 context: &str,
3591 allow_sort_options: bool,
3592 allow_compound: bool,
3593) -> SqlResult<String> {
3594 use sqlparser::ast::Expr as SqlExpr;
3595
3596 if index_col.operator_class.is_some() {
3598 return Err(Error::InvalidArgumentError(format!(
3599 "{} operator classes are not supported",
3600 context
3601 )));
3602 }
3603
3604 let order_expr = &index_col.column;
3605
3606 if allow_sort_options {
3608 let ascending = order_expr.options.asc.unwrap_or(true);
3610 let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
3611
3612 if !ascending {
3613 return Err(Error::InvalidArgumentError(format!(
3614 "{} DESC ordering is not supported",
3615 context
3616 )));
3617 }
3618 if nulls_first {
3619 return Err(Error::InvalidArgumentError(format!(
3620 "{} NULLS FIRST ordering is not supported",
3621 context
3622 )));
3623 }
3624 } else {
3625 if order_expr.options.asc.is_some()
3627 || order_expr.options.nulls_first.is_some()
3628 || order_expr.with_fill.is_some()
3629 {
3630 return Err(Error::InvalidArgumentError(format!(
3631 "{} columns must be simple identifiers",
3632 context
3633 )));
3634 }
3635 }
3636
3637 let column_name = match &order_expr.expr {
3639 SqlExpr::Identifier(ident) => ident.value.clone(),
3640 SqlExpr::CompoundIdentifier(parts) => {
3641 if allow_compound {
3642 parts
3644 .last()
3645 .map(|ident| ident.value.clone())
3646 .ok_or_else(|| {
3647 Error::InvalidArgumentError(format!(
3648 "invalid column reference in {}",
3649 context
3650 ))
3651 })?
3652 } else if parts.len() == 1 {
3653 parts[0].value.clone()
3655 } else {
3656 return Err(Error::InvalidArgumentError(format!(
3657 "{} columns must be column identifiers",
3658 context
3659 )));
3660 }
3661 }
3662 other => {
3663 return Err(Error::InvalidArgumentError(format!(
3664 "{} only supports column references, found {:?}",
3665 context, other
3666 )));
3667 }
3668 };
3669
3670 Ok(column_name)
3671}
3672
3673fn validate_create_table_common(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3674 if stmt.clone.is_some() || stmt.like.is_some() {
3675 return Err(Error::InvalidArgumentError(
3676 "CREATE TABLE LIKE/CLONE is not supported".into(),
3677 ));
3678 }
3679 if stmt.or_replace && stmt.if_not_exists {
3680 return Err(Error::InvalidArgumentError(
3681 "CREATE TABLE cannot combine OR REPLACE with IF NOT EXISTS".into(),
3682 ));
3683 }
3684 use sqlparser::ast::TableConstraint;
3685
3686 let mut seen_primary_key = false;
3687 for constraint in &stmt.constraints {
3688 match constraint {
3689 TableConstraint::PrimaryKey { .. } => {
3690 if seen_primary_key {
3691 return Err(Error::InvalidArgumentError(
3692 "multiple PRIMARY KEY constraints are not supported".into(),
3693 ));
3694 }
3695 seen_primary_key = true;
3696 }
3697 TableConstraint::Unique { .. } => {
3698 }
3700 TableConstraint::ForeignKey { .. } => {
3701 }
3703 other => {
3704 return Err(Error::InvalidArgumentError(format!(
3705 "table-level constraint {:?} is not supported",
3706 other
3707 )));
3708 }
3709 }
3710 }
3711 Ok(())
3712}
3713
3714fn validate_check_constraint(
3715 check_expr: &sqlparser::ast::Expr,
3716 table_name: &str,
3717 column_names: &[&str],
3718) -> SqlResult<()> {
3719 use sqlparser::ast::Expr as SqlExpr;
3720
3721 let column_names_lower: HashSet<String> = column_names
3722 .iter()
3723 .map(|name| name.to_ascii_lowercase())
3724 .collect();
3725
3726 let mut stack: Vec<&SqlExpr> = vec![check_expr];
3727
3728 while let Some(expr) = stack.pop() {
3729 match expr {
3730 SqlExpr::Subquery(_) => {
3731 return Err(Error::InvalidArgumentError(
3732 "Subqueries are not allowed in CHECK constraints".into(),
3733 ));
3734 }
3735 SqlExpr::Function(func) => {
3736 let func_name = func.name.to_string().to_uppercase();
3737 if matches!(func_name.as_str(), "SUM" | "AVG" | "COUNT" | "MIN" | "MAX") {
3738 return Err(Error::InvalidArgumentError(
3739 "Aggregate functions are not allowed in CHECK constraints".into(),
3740 ));
3741 }
3742
3743 if let sqlparser::ast::FunctionArguments::List(list) = &func.args {
3744 for arg in &list.args {
3745 if let sqlparser::ast::FunctionArg::Unnamed(
3746 sqlparser::ast::FunctionArgExpr::Expr(expr),
3747 ) = arg
3748 {
3749 stack.push(expr);
3750 }
3751 }
3752 }
3753 }
3754 SqlExpr::Identifier(ident) => {
3755 if !column_names_lower.contains(&ident.value.to_ascii_lowercase()) {
3756 return Err(Error::InvalidArgumentError(format!(
3757 "Column '{}' referenced in CHECK constraint does not exist",
3758 ident.value
3759 )));
3760 }
3761 }
3762 SqlExpr::CompoundIdentifier(idents) => {
3763 if idents.len() == 2 {
3764 let first = idents[0].value.as_str();
3765 let second = &idents[1].value;
3766
3767 if column_names_lower.contains(&first.to_ascii_lowercase()) {
3768 continue;
3769 }
3770
3771 if !first.eq_ignore_ascii_case(table_name) {
3772 return Err(Error::InvalidArgumentError(format!(
3773 "CHECK constraint references column from different table '{}'",
3774 first
3775 )));
3776 }
3777
3778 if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3779 return Err(Error::InvalidArgumentError(format!(
3780 "Column '{}' referenced in CHECK constraint does not exist",
3781 second
3782 )));
3783 }
3784 } else if idents.len() == 3 {
3785 let first = &idents[0].value;
3786 let second = &idents[1].value;
3787 let third = &idents[2].value;
3788
3789 if first.eq_ignore_ascii_case(table_name) {
3790 if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3791 return Err(Error::InvalidArgumentError(format!(
3792 "Column '{}' referenced in CHECK constraint does not exist",
3793 second
3794 )));
3795 }
3796 } else if second.eq_ignore_ascii_case(table_name) {
3797 if !column_names_lower.contains(&third.to_ascii_lowercase()) {
3798 return Err(Error::InvalidArgumentError(format!(
3799 "Column '{}' referenced in CHECK constraint does not exist",
3800 third
3801 )));
3802 }
3803 } else {
3804 return Err(Error::InvalidArgumentError(format!(
3805 "CHECK constraint references column from different table '{}'",
3806 second
3807 )));
3808 }
3809 }
3810 }
3811 SqlExpr::BinaryOp { left, right, .. } => {
3812 stack.push(left);
3813 stack.push(right);
3814 }
3815 SqlExpr::UnaryOp { expr, .. } | SqlExpr::Nested(expr) => {
3816 stack.push(expr);
3817 }
3818 SqlExpr::Value(_) | SqlExpr::TypedString { .. } => {}
3819 _ => {}
3820 }
3821 }
3822
3823 Ok(())
3824}
3825
3826fn validate_create_table_definition(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3827 for column in &stmt.columns {
3828 for ColumnOptionDef { option, .. } in &column.options {
3829 match option {
3830 ColumnOption::Null
3831 | ColumnOption::NotNull
3832 | ColumnOption::Unique { .. }
3833 | ColumnOption::Check(_)
3834 | ColumnOption::ForeignKey { .. } => {}
3835 ColumnOption::Default(_) => {
3836 return Err(Error::InvalidArgumentError(format!(
3837 "DEFAULT values are not supported for column '{}'",
3838 column.name
3839 )));
3840 }
3841 other => {
3842 return Err(Error::InvalidArgumentError(format!(
3843 "unsupported column option {:?} on '{}'",
3844 other, column.name
3845 )));
3846 }
3847 }
3848 }
3849 }
3850 Ok(())
3851}
3852
3853fn validate_create_table_as(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3854 if !stmt.columns.is_empty() {
3855 return Err(Error::InvalidArgumentError(
3856 "CREATE TABLE AS SELECT does not support column definitions yet".into(),
3857 ));
3858 }
3859 Ok(())
3860}
3861
3862fn validate_simple_query(query: &Query) -> SqlResult<()> {
3863 if query.with.is_some() {
3864 return Err(Error::InvalidArgumentError(
3865 "WITH clauses are not supported".into(),
3866 ));
3867 }
3868 if let Some(limit_clause) = &query.limit_clause {
3869 match limit_clause {
3870 LimitClause::LimitOffset {
3871 offset: Some(_), ..
3872 }
3873 | LimitClause::OffsetCommaLimit { .. } => {
3874 return Err(Error::InvalidArgumentError(
3875 "OFFSET clauses are not supported".into(),
3876 ));
3877 }
3878 LimitClause::LimitOffset { limit_by, .. } if !limit_by.is_empty() => {
3879 return Err(Error::InvalidArgumentError(
3880 "LIMIT BY clauses are not supported".into(),
3881 ));
3882 }
3883 _ => {}
3884 }
3885 }
3886 if query.fetch.is_some() {
3887 return Err(Error::InvalidArgumentError(
3888 "FETCH clauses are not supported".into(),
3889 ));
3890 }
3891 Ok(())
3892}
3893
3894fn resolve_column_name(expr: &SqlExpr) -> SqlResult<String> {
3895 match expr {
3896 SqlExpr::Identifier(ident) => Ok(ident.value.clone()),
3897 SqlExpr::CompoundIdentifier(parts) => {
3898 if let Some(last) = parts.last() {
3899 Ok(last.value.clone())
3900 } else {
3901 Err(Error::InvalidArgumentError(
3902 "empty column identifier".into(),
3903 ))
3904 }
3905 }
3906 _ => Err(Error::InvalidArgumentError(
3907 "aggregate arguments must be plain column identifiers".into(),
3908 )),
3909 }
3910}
3911
3912fn validate_projection_alias_qualifiers(
3913 projection_items: &[SelectItem],
3914 alias: &str,
3915) -> SqlResult<()> {
3916 let alias_lower = alias.to_ascii_lowercase();
3917 for item in projection_items {
3918 match item {
3919 SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
3920 if let SqlExpr::CompoundIdentifier(parts) = expr
3921 && parts.len() >= 2
3922 && let Some(first) = parts.first()
3923 && !first.value.eq_ignore_ascii_case(&alias_lower)
3924 {
3925 return Err(Error::InvalidArgumentError(format!(
3926 "Binder Error: table '{}' not found",
3927 first.value
3928 )));
3929 }
3930 }
3931 _ => {}
3932 }
3933 }
3934 Ok(())
3935}
3936
3937#[allow(dead_code)] fn expr_contains_aggregate(expr: &llkv_expr::expr::ScalarExpr<String>) -> bool {
3941 match expr {
3942 llkv_expr::expr::ScalarExpr::Aggregate(_) => true,
3943 llkv_expr::expr::ScalarExpr::Binary { left, right, .. } => {
3944 expr_contains_aggregate(left) || expr_contains_aggregate(right)
3945 }
3946 llkv_expr::expr::ScalarExpr::GetField { base, .. } => expr_contains_aggregate(base),
3947 llkv_expr::expr::ScalarExpr::Column(_) | llkv_expr::expr::ScalarExpr::Literal(_) => false,
3948 }
3949}
3950
3951fn try_parse_aggregate_function(
3952 func: &sqlparser::ast::Function,
3953) -> SqlResult<Option<llkv_expr::expr::AggregateCall<String>>> {
3954 use sqlparser::ast::{FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart};
3955
3956 if func.uses_odbc_syntax {
3957 return Ok(None);
3958 }
3959 if !matches!(func.parameters, FunctionArguments::None) {
3960 return Ok(None);
3961 }
3962 if func.filter.is_some()
3963 || func.null_treatment.is_some()
3964 || func.over.is_some()
3965 || !func.within_group.is_empty()
3966 {
3967 return Ok(None);
3968 }
3969
3970 let func_name = if func.name.0.len() == 1 {
3971 match &func.name.0[0] {
3972 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
3973 _ => return Ok(None),
3974 }
3975 } else {
3976 return Ok(None);
3977 };
3978
3979 let args_slice: &[FunctionArg] = match &func.args {
3980 FunctionArguments::List(list) => {
3981 if list.duplicate_treatment.is_some() || !list.clauses.is_empty() {
3982 return Ok(None);
3983 }
3984 &list.args
3985 }
3986 FunctionArguments::None => &[],
3987 FunctionArguments::Subquery(_) => return Ok(None),
3988 };
3989
3990 let agg_call = match func_name.as_str() {
3991 "count" => {
3992 if args_slice.len() != 1 {
3993 return Err(Error::InvalidArgumentError(
3994 "COUNT accepts exactly one argument".into(),
3995 ));
3996 }
3997 match &args_slice[0] {
3998 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
3999 llkv_expr::expr::AggregateCall::CountStar
4000 }
4001 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
4002 let column = resolve_column_name(arg_expr)?;
4003 llkv_expr::expr::AggregateCall::Count(column)
4004 }
4005 _ => {
4006 return Err(Error::InvalidArgumentError(
4007 "unsupported COUNT argument".into(),
4008 ));
4009 }
4010 }
4011 }
4012 "sum" => {
4013 if args_slice.len() != 1 {
4014 return Err(Error::InvalidArgumentError(
4015 "SUM accepts exactly one argument".into(),
4016 ));
4017 }
4018 let arg_expr = match &args_slice[0] {
4019 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4020 _ => {
4021 return Err(Error::InvalidArgumentError(
4022 "SUM requires a column argument".into(),
4023 ));
4024 }
4025 };
4026
4027 if let Some(column) = parse_count_nulls_case(arg_expr)? {
4029 llkv_expr::expr::AggregateCall::CountNulls(column)
4030 } else {
4031 let column = resolve_column_name(arg_expr)?;
4032 llkv_expr::expr::AggregateCall::Sum(column)
4033 }
4034 }
4035 "min" => {
4036 if args_slice.len() != 1 {
4037 return Err(Error::InvalidArgumentError(
4038 "MIN accepts exactly one argument".into(),
4039 ));
4040 }
4041 let arg_expr = match &args_slice[0] {
4042 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4043 _ => {
4044 return Err(Error::InvalidArgumentError(
4045 "MIN requires a column argument".into(),
4046 ));
4047 }
4048 };
4049 let column = resolve_column_name(arg_expr)?;
4050 llkv_expr::expr::AggregateCall::Min(column)
4051 }
4052 "max" => {
4053 if args_slice.len() != 1 {
4054 return Err(Error::InvalidArgumentError(
4055 "MAX accepts exactly one argument".into(),
4056 ));
4057 }
4058 let arg_expr = match &args_slice[0] {
4059 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4060 _ => {
4061 return Err(Error::InvalidArgumentError(
4062 "MAX requires a column argument".into(),
4063 ));
4064 }
4065 };
4066 let column = resolve_column_name(arg_expr)?;
4067 llkv_expr::expr::AggregateCall::Max(column)
4068 }
4069 _ => return Ok(None),
4070 };
4071
4072 Ok(Some(agg_call))
4073}
4074
4075fn parse_count_nulls_case(expr: &SqlExpr) -> SqlResult<Option<String>> {
4076 let SqlExpr::Case {
4077 operand,
4078 conditions,
4079 else_result,
4080 ..
4081 } = expr
4082 else {
4083 return Ok(None);
4084 };
4085
4086 if operand.is_some() || conditions.len() != 1 {
4087 return Ok(None);
4088 }
4089
4090 let case_when = &conditions[0];
4091 if !is_integer_literal(&case_when.result, 1) {
4092 return Ok(None);
4093 }
4094
4095 let else_expr = match else_result {
4096 Some(expr) => expr.as_ref(),
4097 None => return Ok(None),
4098 };
4099 if !is_integer_literal(else_expr, 0) {
4100 return Ok(None);
4101 }
4102
4103 let inner = match &case_when.condition {
4104 SqlExpr::IsNull(inner) => inner.as_ref(),
4105 _ => return Ok(None),
4106 };
4107
4108 resolve_column_name(inner).map(Some)
4109}
4110
4111fn is_integer_literal(expr: &SqlExpr, expected: i64) -> bool {
4112 match expr {
4113 SqlExpr::Value(ValueWithSpan {
4114 value: Value::Number(text, _),
4115 ..
4116 }) => text.parse::<i64>() == Ok(expected),
4117 _ => false,
4118 }
4119}
4120
4121fn translate_condition_with_context(
4122 resolver: &IdentifierResolver<'_>,
4123 context: IdentifierContext,
4124 expr: &SqlExpr,
4125) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
4126 match expr {
4127 SqlExpr::IsNull(inner) => {
4128 let scalar = translate_scalar_with_context(resolver, context, inner)?;
4129 match scalar {
4130 llkv_expr::expr::ScalarExpr::Column(column) => {
4131 Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4132 field_id: column,
4133 op: llkv_expr::expr::Operator::IsNull,
4134 }))
4135 }
4136 _ => Err(Error::InvalidArgumentError(
4137 "IS NULL predicates currently support column references only".into(),
4138 )),
4139 }
4140 }
4141 SqlExpr::IsNotNull(inner) => {
4142 let scalar = translate_scalar_with_context(resolver, context, inner)?;
4143 match scalar {
4144 llkv_expr::expr::ScalarExpr::Column(column) => {
4145 Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4146 field_id: column,
4147 op: llkv_expr::expr::Operator::IsNotNull,
4148 }))
4149 }
4150 _ => Err(Error::InvalidArgumentError(
4151 "IS NOT NULL predicates currently support column references only".into(),
4152 )),
4153 }
4154 }
4155 SqlExpr::BinaryOp { left, op, right } => match op {
4156 BinaryOperator::And => Ok(llkv_expr::expr::Expr::And(vec![
4157 translate_condition_with_context(resolver, context, left)?,
4158 translate_condition_with_context(resolver, context, right)?,
4159 ])),
4160 BinaryOperator::Or => Ok(llkv_expr::expr::Expr::Or(vec![
4161 translate_condition_with_context(resolver, context, left)?,
4162 translate_condition_with_context(resolver, context, right)?,
4163 ])),
4164 BinaryOperator::Eq
4165 | BinaryOperator::NotEq
4166 | BinaryOperator::Lt
4167 | BinaryOperator::LtEq
4168 | BinaryOperator::Gt
4169 | BinaryOperator::GtEq => {
4170 translate_comparison_with_context(resolver, context, left, op.clone(), right)
4171 }
4172 other => Err(Error::InvalidArgumentError(format!(
4173 "unsupported binary operator in WHERE clause: {other:?}"
4174 ))),
4175 },
4176 SqlExpr::UnaryOp {
4177 op: UnaryOperator::Not,
4178 expr,
4179 } => Ok(llkv_expr::expr::Expr::not(
4180 translate_condition_with_context(resolver, context, expr)?,
4181 )),
4182 SqlExpr::Nested(inner) => translate_condition_with_context(resolver, context, inner),
4183 other => Err(Error::InvalidArgumentError(format!(
4184 "unsupported WHERE clause: {other:?}"
4185 ))),
4186 }
4187}
4188
4189fn translate_comparison_with_context(
4190 resolver: &IdentifierResolver<'_>,
4191 context: IdentifierContext,
4192 left: &SqlExpr,
4193 op: BinaryOperator,
4194 right: &SqlExpr,
4195) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
4196 let left_scalar = translate_scalar_with_context(resolver, context, left)?;
4197 let right_scalar = translate_scalar_with_context(resolver, context, right)?;
4198 let compare_op = match op {
4199 BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
4200 BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
4201 BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
4202 BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
4203 BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
4204 BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
4205 other => {
4206 return Err(Error::InvalidArgumentError(format!(
4207 "unsupported comparison operator: {other:?}"
4208 )));
4209 }
4210 };
4211
4212 if let (
4213 llkv_expr::expr::ScalarExpr::Column(column),
4214 llkv_expr::expr::ScalarExpr::Literal(literal),
4215 ) = (&left_scalar, &right_scalar)
4216 && let Some(op) = compare_op_to_filter_operator(compare_op, literal)
4217 {
4218 return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4219 field_id: column.clone(),
4220 op,
4221 }));
4222 }
4223
4224 if let (
4225 llkv_expr::expr::ScalarExpr::Literal(literal),
4226 llkv_expr::expr::ScalarExpr::Column(column),
4227 ) = (&left_scalar, &right_scalar)
4228 && let Some(flipped) = flip_compare_op(compare_op)
4229 && let Some(op) = compare_op_to_filter_operator(flipped, literal)
4230 {
4231 return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4232 field_id: column.clone(),
4233 op,
4234 }));
4235 }
4236
4237 Ok(llkv_expr::expr::Expr::Compare {
4238 left: left_scalar,
4239 op: compare_op,
4240 right: right_scalar,
4241 })
4242}
4243
4244fn compare_op_to_filter_operator(
4245 op: llkv_expr::expr::CompareOp,
4246 literal: &Literal,
4247) -> Option<llkv_expr::expr::Operator<'static>> {
4248 let lit = literal.clone();
4249 match op {
4250 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::Operator::Equals(lit)),
4251 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::Operator::LessThan(lit)),
4252 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::Operator::LessThanOrEquals(lit)),
4253 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::Operator::GreaterThan(lit)),
4254 llkv_expr::expr::CompareOp::GtEq => {
4255 Some(llkv_expr::expr::Operator::GreaterThanOrEquals(lit))
4256 }
4257 llkv_expr::expr::CompareOp::NotEq => None,
4258 }
4259}
4260
4261fn flip_compare_op(op: llkv_expr::expr::CompareOp) -> Option<llkv_expr::expr::CompareOp> {
4262 match op {
4263 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::CompareOp::Eq),
4264 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::CompareOp::Gt),
4265 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::CompareOp::GtEq),
4266 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::CompareOp::Lt),
4267 llkv_expr::expr::CompareOp::GtEq => Some(llkv_expr::expr::CompareOp::LtEq),
4268 llkv_expr::expr::CompareOp::NotEq => None,
4269 }
4270}
4271fn translate_scalar_with_context(
4274 resolver: &IdentifierResolver<'_>,
4275 context: IdentifierContext,
4276 expr: &SqlExpr,
4277) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4278 match expr {
4279 SqlExpr::Identifier(ident) => {
4280 let parts = vec![ident.value.clone()];
4281 let resolution = resolver.resolve(&parts, context)?;
4282 Ok(resolution.into_scalar_expr())
4283 }
4284 SqlExpr::CompoundIdentifier(idents) => {
4285 if idents.is_empty() {
4286 return Err(Error::InvalidArgumentError(
4287 "invalid compound identifier".into(),
4288 ));
4289 }
4290
4291 let parts: Vec<String> = idents.iter().map(|ident| ident.value.clone()).collect();
4292 let resolution = resolver.resolve(&parts, context)?;
4293 Ok(resolution.into_scalar_expr())
4294 }
4295 _ => translate_scalar(expr),
4296 }
4297}
4298
4299fn translate_scalar(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4301 match expr {
4302 SqlExpr::Identifier(ident) => Ok(llkv_expr::expr::ScalarExpr::column(ident.value.clone())),
4303 SqlExpr::CompoundIdentifier(idents) => {
4304 if idents.is_empty() {
4305 return Err(Error::InvalidArgumentError(
4306 "invalid compound identifier".into(),
4307 ));
4308 }
4309
4310 let column_name = idents[0].value.clone();
4312 let mut result = llkv_expr::expr::ScalarExpr::column(column_name);
4313
4314 for part in &idents[1..] {
4315 let field_name = part.value.clone();
4316 result = llkv_expr::expr::ScalarExpr::get_field(result, field_name);
4317 }
4318
4319 Ok(result)
4320 }
4321 SqlExpr::Value(value) => literal_from_value(value),
4322 SqlExpr::BinaryOp { left, op, right } => {
4323 let left_expr = translate_scalar(left)?;
4324 let right_expr = translate_scalar(right)?;
4325 let op = match op {
4326 BinaryOperator::Plus => llkv_expr::expr::BinaryOp::Add,
4327 BinaryOperator::Minus => llkv_expr::expr::BinaryOp::Subtract,
4328 BinaryOperator::Multiply => llkv_expr::expr::BinaryOp::Multiply,
4329 BinaryOperator::Divide => llkv_expr::expr::BinaryOp::Divide,
4330 BinaryOperator::Modulo => llkv_expr::expr::BinaryOp::Modulo,
4331 other => {
4332 return Err(Error::InvalidArgumentError(format!(
4333 "unsupported scalar binary operator: {other:?}"
4334 )));
4335 }
4336 };
4337 Ok(llkv_expr::expr::ScalarExpr::binary(
4338 left_expr, op, right_expr,
4339 ))
4340 }
4341 SqlExpr::UnaryOp {
4342 op: UnaryOperator::Minus,
4343 expr,
4344 } => match translate_scalar(expr)? {
4345 llkv_expr::expr::ScalarExpr::Literal(lit) => match lit {
4346 Literal::Integer(v) => {
4347 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(-v)))
4348 }
4349 Literal::Float(v) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(-v))),
4350 Literal::Boolean(_) => Err(Error::InvalidArgumentError(
4351 "cannot negate boolean literal".into(),
4352 )),
4353 Literal::String(_) => Err(Error::InvalidArgumentError(
4354 "cannot negate string literal".into(),
4355 )),
4356 Literal::Struct(_) => Err(Error::InvalidArgumentError(
4357 "cannot negate struct literal".into(),
4358 )),
4359 Literal::Null => Err(Error::InvalidArgumentError(
4360 "cannot negate null literal".into(),
4361 )),
4362 },
4363 _ => Err(Error::InvalidArgumentError(
4364 "cannot negate non-literal expression".into(),
4365 )),
4366 },
4367 SqlExpr::UnaryOp {
4368 op: UnaryOperator::Plus,
4369 expr,
4370 } => translate_scalar(expr),
4371 SqlExpr::Nested(inner) => translate_scalar(inner),
4372 SqlExpr::Function(func) => {
4373 if let Some(agg_call) = try_parse_aggregate_function(func)? {
4375 Ok(llkv_expr::expr::ScalarExpr::aggregate(agg_call))
4376 } else {
4377 Err(Error::InvalidArgumentError(format!(
4378 "unsupported function in scalar expression: {:?}",
4379 func.name
4380 )))
4381 }
4382 }
4383 SqlExpr::Dictionary(fields) => {
4384 let mut struct_fields = Vec::new();
4386 for entry in fields {
4387 let key = entry.key.value.clone(); let value_expr = translate_scalar(&entry.value)?;
4390 match value_expr {
4392 llkv_expr::expr::ScalarExpr::Literal(lit) => {
4393 struct_fields.push((key, Box::new(lit)));
4394 }
4395 _ => {
4396 return Err(Error::InvalidArgumentError(
4397 "Dictionary values must be literals".to_string(),
4398 ));
4399 }
4400 }
4401 }
4402 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Struct(
4403 struct_fields,
4404 )))
4405 }
4406 other => Err(Error::InvalidArgumentError(format!(
4407 "unsupported scalar expression: {other:?}"
4408 ))),
4409 }
4410}
4411
4412fn literal_from_value(value: &ValueWithSpan) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4413 match &value.value {
4414 Value::Number(text, _) => {
4415 if text.contains(['.', 'e', 'E']) {
4416 let parsed = text.parse::<f64>().map_err(|err| {
4417 Error::InvalidArgumentError(format!("invalid float literal: {err}"))
4418 })?;
4419 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(parsed)))
4420 } else {
4421 let parsed = text.parse::<i128>().map_err(|err| {
4422 Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
4423 })?;
4424 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(
4425 parsed,
4426 )))
4427 }
4428 }
4429 Value::Boolean(value) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Boolean(
4430 *value,
4431 ))),
4432 Value::Null => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Null)),
4433 other => {
4434 if let Some(text) = other.clone().into_string() {
4435 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::String(text)))
4436 } else {
4437 Err(Error::InvalidArgumentError(format!(
4438 "unsupported literal: {other:?}"
4439 )))
4440 }
4441 }
4442 }
4443}
4444
4445fn resolve_assignment_column_name(target: &AssignmentTarget) -> SqlResult<String> {
4446 match target {
4447 AssignmentTarget::ColumnName(name) => {
4448 if name.0.len() != 1 {
4449 return Err(Error::InvalidArgumentError(
4450 "qualified column names in UPDATE assignments are not supported yet".into(),
4451 ));
4452 }
4453 match &name.0[0] {
4454 ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
4455 other => Err(Error::InvalidArgumentError(format!(
4456 "unsupported column reference in UPDATE assignment: {other:?}"
4457 ))),
4458 }
4459 }
4460 AssignmentTarget::Tuple(_) => Err(Error::InvalidArgumentError(
4461 "tuple assignments are not supported yet".into(),
4462 )),
4463 }
4464}
4465
4466fn arrow_type_from_sql(data_type: &SqlDataType) -> SqlResult<arrow::datatypes::DataType> {
4467 use arrow::datatypes::DataType;
4468 match data_type {
4469 SqlDataType::Int(_)
4470 | SqlDataType::Integer(_)
4471 | SqlDataType::BigInt(_)
4472 | SqlDataType::SmallInt(_)
4473 | SqlDataType::TinyInt(_) => Ok(DataType::Int64),
4474 SqlDataType::Float(_)
4475 | SqlDataType::Real
4476 | SqlDataType::Double(_)
4477 | SqlDataType::DoublePrecision => Ok(DataType::Float64),
4478 SqlDataType::Text
4479 | SqlDataType::String(_)
4480 | SqlDataType::Varchar(_)
4481 | SqlDataType::Char(_)
4482 | SqlDataType::Uuid => Ok(DataType::Utf8),
4483 SqlDataType::Date => Ok(DataType::Date32),
4484 SqlDataType::Decimal(_) | SqlDataType::Numeric(_) => Ok(DataType::Float64),
4485 SqlDataType::Boolean => Ok(DataType::Boolean),
4486 SqlDataType::Custom(name, args) => {
4487 if name.0.len() == 1
4488 && let ObjectNamePart::Identifier(ident) = &name.0[0]
4489 && ident.value.eq_ignore_ascii_case("row")
4490 {
4491 return row_type_to_arrow(data_type, args);
4492 }
4493 Err(Error::InvalidArgumentError(format!(
4494 "unsupported SQL data type: {data_type:?}"
4495 )))
4496 }
4497 other => Err(Error::InvalidArgumentError(format!(
4498 "unsupported SQL data type: {other:?}"
4499 ))),
4500 }
4501}
4502
4503fn row_type_to_arrow(
4504 data_type: &SqlDataType,
4505 tokens: &[String],
4506) -> SqlResult<arrow::datatypes::DataType> {
4507 use arrow::datatypes::{DataType, Field, FieldRef, Fields};
4508
4509 let row_str = data_type.to_string();
4510 if tokens.is_empty() {
4511 return Err(Error::InvalidArgumentError(
4512 "ROW type must define at least one field".into(),
4513 ));
4514 }
4515
4516 let dialect = GenericDialect {};
4517 let field_definitions = resolve_row_field_types(tokens, &dialect).map_err(|err| {
4518 Error::InvalidArgumentError(format!("unable to parse ROW type '{row_str}': {err}"))
4519 })?;
4520
4521 let mut fields: Vec<FieldRef> = Vec::with_capacity(field_definitions.len());
4522 for (field_name, field_type) in field_definitions {
4523 let arrow_field_type = arrow_type_from_sql(&field_type)?;
4524 fields.push(Arc::new(Field::new(field_name, arrow_field_type, true)));
4525 }
4526
4527 let struct_fields: Fields = fields.into();
4528 Ok(DataType::Struct(struct_fields))
4529}
4530
4531fn resolve_row_field_types(
4532 tokens: &[String],
4533 dialect: &GenericDialect,
4534) -> SqlResult<Vec<(String, SqlDataType)>> {
4535 if tokens.is_empty() {
4536 return Err(Error::InvalidArgumentError(
4537 "ROW type must define at least one field".into(),
4538 ));
4539 }
4540
4541 let mut start = 0;
4542 let mut end = tokens.len();
4543 if tokens[start] == "(" {
4544 if end == 0 || tokens[end - 1] != ")" {
4545 return Err(Error::InvalidArgumentError(
4546 "ROW type is missing closing ')'".into(),
4547 ));
4548 }
4549 start += 1;
4550 end -= 1;
4551 } else if tokens[end - 1] == ")" {
4552 return Err(Error::InvalidArgumentError(
4553 "ROW type contains unmatched ')'".into(),
4554 ));
4555 }
4556
4557 let slice = &tokens[start..end];
4558 if slice.is_empty() {
4559 return Err(Error::InvalidArgumentError(
4560 "ROW type did not provide any field definitions".into(),
4561 ));
4562 }
4563
4564 let mut fields = Vec::new();
4565 let mut index = 0;
4566
4567 while index < slice.len() {
4568 if slice[index] == "," {
4569 index += 1;
4570 continue;
4571 }
4572
4573 let field_name = normalize_row_field_name(&slice[index])?;
4574 index += 1;
4575
4576 if index >= slice.len() {
4577 return Err(Error::InvalidArgumentError(format!(
4578 "ROW field '{field_name}' is missing a type specification"
4579 )));
4580 }
4581
4582 let mut last_success: Option<(usize, SqlDataType)> = None;
4583 let mut type_end = index;
4584
4585 while type_end <= slice.len() {
4586 let candidate = slice[index..type_end].join(" ");
4587 if candidate.trim().is_empty() {
4588 type_end += 1;
4589 continue;
4590 }
4591
4592 if let Ok(parsed_type) = parse_sql_data_type(&candidate, dialect) {
4593 last_success = Some((type_end, parsed_type));
4594 }
4595
4596 if type_end == slice.len() {
4597 break;
4598 }
4599
4600 if slice[type_end] == "," && last_success.is_some() {
4601 break;
4602 }
4603
4604 type_end += 1;
4605 }
4606
4607 let Some((next_index, data_type)) = last_success else {
4608 return Err(Error::InvalidArgumentError(format!(
4609 "failed to parse ROW field type for '{field_name}'"
4610 )));
4611 };
4612
4613 fields.push((field_name, data_type));
4614 index = next_index;
4615
4616 if index < slice.len() && slice[index] == "," {
4617 index += 1;
4618 }
4619 }
4620
4621 if fields.is_empty() {
4622 return Err(Error::InvalidArgumentError(
4623 "ROW type did not provide any field definitions".into(),
4624 ));
4625 }
4626
4627 Ok(fields)
4628}
4629
4630fn normalize_row_field_name(raw: &str) -> SqlResult<String> {
4631 let trimmed = raw.trim();
4632 if trimmed.is_empty() {
4633 return Err(Error::InvalidArgumentError(
4634 "ROW field name must not be empty".into(),
4635 ));
4636 }
4637
4638 if let Some(stripped) = trimmed.strip_prefix('"') {
4639 let without_end = stripped.strip_suffix('"').ok_or_else(|| {
4640 Error::InvalidArgumentError(format!("unterminated quoted ROW field name: {trimmed}"))
4641 })?;
4642 let name = without_end.replace("\"\"", "\"");
4643 return Ok(name);
4644 }
4645
4646 Ok(trimmed.to_string())
4647}
4648
4649fn parse_sql_data_type(type_str: &str, dialect: &GenericDialect) -> SqlResult<SqlDataType> {
4650 let trimmed = type_str.trim();
4651 let sql = format!("CREATE TABLE __row(__field {trimmed});");
4652 let statements = Parser::parse_sql(dialect, &sql).map_err(|err| {
4653 Error::InvalidArgumentError(format!("failed to parse ROW field type '{trimmed}': {err}"))
4654 })?;
4655
4656 let stmt = statements.into_iter().next().ok_or_else(|| {
4657 Error::InvalidArgumentError(format!(
4658 "ROW field type '{trimmed}' did not produce a statement"
4659 ))
4660 })?;
4661
4662 match stmt {
4663 Statement::CreateTable(table) => table
4664 .columns
4665 .first()
4666 .map(|col| col.data_type.clone())
4667 .ok_or_else(|| {
4668 Error::InvalidArgumentError(format!(
4669 "ROW field type '{trimmed}' missing column definition"
4670 ))
4671 }),
4672 other => Err(Error::InvalidArgumentError(format!(
4673 "unexpected statement while parsing ROW field type: {other:?}"
4674 ))),
4675 }
4676}
4677
4678type ExtractValuesResult = Option<(Vec<Vec<PlanValue>>, Vec<String>)>;
4681
4682#[allow(clippy::type_complexity)]
4683fn extract_values_from_derived_table(from: &[TableWithJoins]) -> SqlResult<ExtractValuesResult> {
4684 if from.len() != 1 {
4685 return Ok(None);
4686 }
4687
4688 let table_with_joins = &from[0];
4689 if !table_with_joins.joins.is_empty() {
4690 return Ok(None);
4691 }
4692
4693 match &table_with_joins.relation {
4694 TableFactor::Derived {
4695 subquery, alias, ..
4696 } => {
4697 let values = match subquery.body.as_ref() {
4699 SetExpr::Values(v) => v,
4700 _ => return Ok(None),
4701 };
4702
4703 let column_names = if let Some(alias) = alias {
4705 alias
4706 .columns
4707 .iter()
4708 .map(|col_def| col_def.name.value.clone())
4709 .collect::<Vec<_>>()
4710 } else {
4711 if values.rows.is_empty() {
4713 return Err(Error::InvalidArgumentError(
4714 "VALUES expression must have at least one row".into(),
4715 ));
4716 }
4717 let first_row = &values.rows[0];
4718 (0..first_row.len())
4719 .map(|i| format!("column{}", i))
4720 .collect()
4721 };
4722
4723 if values.rows.is_empty() {
4725 return Err(Error::InvalidArgumentError(
4726 "VALUES expression must have at least one row".into(),
4727 ));
4728 }
4729
4730 let mut rows = Vec::with_capacity(values.rows.len());
4731 for row in &values.rows {
4732 if row.len() != column_names.len() {
4733 return Err(Error::InvalidArgumentError(format!(
4734 "VALUES row has {} columns but table alias specifies {} columns",
4735 row.len(),
4736 column_names.len()
4737 )));
4738 }
4739
4740 let mut converted_row = Vec::with_capacity(row.len());
4741 for expr in row {
4742 let value = SqlValue::try_from_expr(expr)?;
4743 converted_row.push(PlanValue::from(value));
4744 }
4745 rows.push(converted_row);
4746 }
4747
4748 Ok(Some((rows, column_names)))
4749 }
4750 _ => Ok(None),
4751 }
4752}
4753
4754fn extract_constant_select_rows(select: &Select) -> SqlResult<Option<Vec<Vec<PlanValue>>>> {
4755 if !select.from.is_empty() {
4756 return Ok(None);
4757 }
4758
4759 if select.selection.is_some()
4760 || select.having.is_some()
4761 || !select.named_window.is_empty()
4762 || select.qualify.is_some()
4763 || select.distinct.is_some()
4764 || select.top.is_some()
4765 || select.into.is_some()
4766 || select.prewhere.is_some()
4767 || !select.lateral_views.is_empty()
4768 || select.value_table_mode.is_some()
4769 || !group_by_is_empty(&select.group_by)
4770 {
4771 return Err(Error::InvalidArgumentError(
4772 "constant SELECT statements do not support advanced clauses".into(),
4773 ));
4774 }
4775
4776 if select.projection.is_empty() {
4777 return Err(Error::InvalidArgumentError(
4778 "constant SELECT requires at least one projection".into(),
4779 ));
4780 }
4781
4782 let mut row: Vec<PlanValue> = Vec::with_capacity(select.projection.len());
4783 for item in &select.projection {
4784 let expr = match item {
4785 SelectItem::UnnamedExpr(expr) => expr,
4786 SelectItem::ExprWithAlias { expr, .. } => expr,
4787 other => {
4788 return Err(Error::InvalidArgumentError(format!(
4789 "unsupported projection in constant SELECT: {other:?}"
4790 )));
4791 }
4792 };
4793
4794 let value = SqlValue::try_from_expr(expr)?;
4795 row.push(PlanValue::from(value));
4796 }
4797
4798 Ok(Some(vec![row]))
4799}
4800
4801fn extract_single_table(from: &[TableWithJoins]) -> SqlResult<(String, String)> {
4802 if from.len() != 1 {
4803 return Err(Error::InvalidArgumentError(
4804 "queries over multiple tables are not supported yet".into(),
4805 ));
4806 }
4807 let item = &from[0];
4808 if !item.joins.is_empty() {
4809 return Err(Error::InvalidArgumentError(
4810 "JOIN clauses are not supported yet".into(),
4811 ));
4812 }
4813 match &item.relation {
4814 TableFactor::Table { name, .. } => canonical_object_name(name),
4815 TableFactor::Derived { alias, .. } => {
4816 let table_name = alias
4819 .as_ref()
4820 .map(|a| a.name.value.clone())
4821 .unwrap_or_else(|| "derived".to_string());
4822 let canonical = table_name.to_ascii_lowercase();
4823 Ok((table_name, canonical))
4824 }
4825 _ => Err(Error::InvalidArgumentError(
4826 "queries require a plain table name or derived table".into(),
4827 )),
4828 }
4829}
4830
4831fn extract_tables(from: &[TableWithJoins]) -> SqlResult<Vec<llkv_plan::TableRef>> {
4833 let mut tables = Vec::new();
4834
4835 for item in from {
4836 if !item.joins.is_empty() {
4838 return Err(Error::InvalidArgumentError(
4839 "JOIN clauses are not supported yet".into(),
4840 ));
4841 }
4842
4843 match &item.relation {
4845 TableFactor::Table { name, .. } => {
4846 let (schema_opt, table) = parse_schema_qualified_name(name)?;
4847 let schema = schema_opt.unwrap_or_default();
4848 tables.push(llkv_plan::TableRef::new(schema, table));
4849 }
4850 _ => {
4851 return Err(Error::InvalidArgumentError(
4852 "queries require a plain table name".into(),
4853 ));
4854 }
4855 }
4856 }
4857
4858 Ok(tables)
4859}
4860
4861fn group_by_is_empty(expr: &GroupByExpr) -> bool {
4862 matches!(
4863 expr,
4864 GroupByExpr::Expressions(exprs, modifiers)
4865 if exprs.is_empty() && modifiers.is_empty()
4866 )
4867}
4868
4869#[cfg(test)]
4870mod tests {
4871 use super::*;
4872 use arrow::array::{Array, Int64Array, StringArray};
4873 use arrow::record_batch::RecordBatch;
4874 use llkv_storage::pager::MemPager;
4875
4876 fn extract_string_options(batches: &[RecordBatch]) -> Vec<Option<String>> {
4877 let mut values: Vec<Option<String>> = Vec::new();
4878 for batch in batches {
4879 let column = batch
4880 .column(0)
4881 .as_any()
4882 .downcast_ref::<StringArray>()
4883 .expect("string column");
4884 for idx in 0..column.len() {
4885 if column.is_null(idx) {
4886 values.push(None);
4887 } else {
4888 values.push(Some(column.value(idx).to_string()));
4889 }
4890 }
4891 }
4892 values
4893 }
4894
4895 #[test]
4896 fn create_insert_select_roundtrip() {
4897 let pager = Arc::new(MemPager::default());
4898 let engine = SqlEngine::new(pager);
4899
4900 let result = engine
4901 .execute("CREATE TABLE people (id INT NOT NULL, name TEXT NOT NULL)")
4902 .expect("create table");
4903 assert!(matches!(
4904 result[0],
4905 RuntimeStatementResult::CreateTable { .. }
4906 ));
4907
4908 let result = engine
4909 .execute("INSERT INTO people (id, name) VALUES (1, 'alice'), (2, 'bob')")
4910 .expect("insert rows");
4911 assert!(matches!(
4912 result[0],
4913 RuntimeStatementResult::Insert {
4914 rows_inserted: 2,
4915 ..
4916 }
4917 ));
4918
4919 let mut result = engine
4920 .execute("SELECT name FROM people WHERE id = 2")
4921 .expect("select rows");
4922 let select_result = result.remove(0);
4923 let batches = match select_result {
4924 RuntimeStatementResult::Select { execution, .. } => {
4925 execution.collect().expect("collect batches")
4926 }
4927 _ => panic!("expected select result"),
4928 };
4929 assert_eq!(batches.len(), 1);
4930 let column = batches[0]
4931 .column(0)
4932 .as_any()
4933 .downcast_ref::<StringArray>()
4934 .expect("string column");
4935 assert_eq!(column.len(), 1);
4936 assert_eq!(column.value(0), "bob");
4937 }
4938
4939 #[test]
4940 fn insert_select_constant_including_null() {
4941 let pager = Arc::new(MemPager::default());
4942 let engine = SqlEngine::new(pager);
4943
4944 engine
4945 .execute("CREATE TABLE integers(i INTEGER)")
4946 .expect("create table");
4947
4948 let result = engine
4949 .execute("INSERT INTO integers SELECT 42")
4950 .expect("insert literal");
4951 assert!(matches!(
4952 result[0],
4953 RuntimeStatementResult::Insert {
4954 rows_inserted: 1,
4955 ..
4956 }
4957 ));
4958
4959 let result = engine
4960 .execute("INSERT INTO integers SELECT CAST(NULL AS VARCHAR)")
4961 .expect("insert null literal");
4962 assert!(matches!(
4963 result[0],
4964 RuntimeStatementResult::Insert {
4965 rows_inserted: 1,
4966 ..
4967 }
4968 ));
4969
4970 let mut result = engine
4971 .execute("SELECT * FROM integers")
4972 .expect("select rows");
4973 let select_result = result.remove(0);
4974 let batches = match select_result {
4975 RuntimeStatementResult::Select { execution, .. } => {
4976 execution.collect().expect("collect batches")
4977 }
4978 _ => panic!("expected select result"),
4979 };
4980
4981 let mut values: Vec<Option<i64>> = Vec::new();
4982 for batch in &batches {
4983 let column = batch
4984 .column(0)
4985 .as_any()
4986 .downcast_ref::<Int64Array>()
4987 .expect("int column");
4988 for idx in 0..column.len() {
4989 if column.is_null(idx) {
4990 values.push(None);
4991 } else {
4992 values.push(Some(column.value(idx)));
4993 }
4994 }
4995 }
4996
4997 assert_eq!(values, vec![Some(42), None]);
4998 }
4999
5000 #[test]
5001 fn update_with_where_clause_filters_rows() {
5002 let pager = Arc::new(MemPager::default());
5003 let engine = SqlEngine::new(pager);
5004
5005 engine
5006 .execute("SET default_null_order='nulls_first'")
5007 .expect("set default null order");
5008
5009 engine
5010 .execute("CREATE TABLE strings(a VARCHAR)")
5011 .expect("create table");
5012
5013 engine
5014 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
5015 .expect("insert seed rows");
5016
5017 let result = engine
5018 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
5019 .expect("update rows");
5020 assert!(matches!(
5021 result[0],
5022 RuntimeStatementResult::Update {
5023 rows_updated: 1,
5024 ..
5025 }
5026 ));
5027
5028 let mut result = engine
5029 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5030 .expect("select rows");
5031 let select_result = result.remove(0);
5032 let batches = match select_result {
5033 RuntimeStatementResult::Select { execution, .. } => {
5034 execution.collect().expect("collect batches")
5035 }
5036 _ => panic!("expected select result"),
5037 };
5038
5039 let mut values: Vec<Option<String>> = Vec::new();
5040 for batch in &batches {
5041 let column = batch
5042 .column(0)
5043 .as_any()
5044 .downcast_ref::<StringArray>()
5045 .expect("string column");
5046 for idx in 0..column.len() {
5047 if column.is_null(idx) {
5048 values.push(None);
5049 } else {
5050 values.push(Some(column.value(idx).to_string()));
5051 }
5052 }
5053 }
5054
5055 values.sort_by(|a, b| match (a, b) {
5056 (None, None) => std::cmp::Ordering::Equal,
5057 (None, Some(_)) => std::cmp::Ordering::Less,
5058 (Some(_), None) => std::cmp::Ordering::Greater,
5059 (Some(av), Some(bv)) => {
5060 let a_val = av.parse::<i64>().unwrap_or_default();
5061 let b_val = bv.parse::<i64>().unwrap_or_default();
5062 a_val.cmp(&b_val)
5063 }
5064 });
5065
5066 assert_eq!(
5067 values,
5068 vec![None, Some("4".to_string()), Some("13".to_string())]
5069 );
5070 }
5071
5072 #[test]
5073 fn order_by_honors_configured_default_null_order() {
5074 let pager = Arc::new(MemPager::default());
5075 let engine = SqlEngine::new(pager);
5076
5077 engine
5078 .execute("CREATE TABLE strings(a VARCHAR)")
5079 .expect("create table");
5080 engine
5081 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
5082 .expect("insert values");
5083 engine
5084 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
5085 .expect("update value");
5086
5087 let mut result = engine
5088 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5089 .expect("select rows");
5090 let select_result = result.remove(0);
5091 let batches = match select_result {
5092 RuntimeStatementResult::Select { execution, .. } => {
5093 execution.collect().expect("collect batches")
5094 }
5095 _ => panic!("expected select result"),
5096 };
5097
5098 let values = extract_string_options(&batches);
5099 assert_eq!(
5100 values,
5101 vec![Some("4".to_string()), Some("13".to_string()), None]
5102 );
5103
5104 assert!(!engine.default_nulls_first_for_tests());
5105
5106 engine
5107 .execute("SET default_null_order='nulls_first'")
5108 .expect("set default null order");
5109
5110 assert!(engine.default_nulls_first_for_tests());
5111
5112 let mut result = engine
5113 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5114 .expect("select rows");
5115 let select_result = result.remove(0);
5116 let batches = match select_result {
5117 RuntimeStatementResult::Select { execution, .. } => {
5118 execution.collect().expect("collect batches")
5119 }
5120 _ => panic!("expected select result"),
5121 };
5122
5123 let values = extract_string_options(&batches);
5124 assert_eq!(
5125 values,
5126 vec![None, Some("4".to_string()), Some("13".to_string())]
5127 );
5128 }
5129
5130 #[test]
5131 fn arrow_type_from_row_returns_struct_fields() {
5132 let dialect = GenericDialect {};
5133 let statements = Parser::parse_sql(
5134 &dialect,
5135 "CREATE TABLE row_types(payload ROW(a INTEGER, b VARCHAR));",
5136 )
5137 .expect("parse ROW type definition");
5138
5139 let data_type = match &statements[0] {
5140 Statement::CreateTable(stmt) => stmt.columns[0].data_type.clone(),
5141 other => panic!("unexpected statement: {other:?}"),
5142 };
5143
5144 let arrow_type = arrow_type_from_sql(&data_type).expect("convert ROW type");
5145 match arrow_type {
5146 arrow::datatypes::DataType::Struct(fields) => {
5147 assert_eq!(fields.len(), 2, "unexpected field count");
5148 assert_eq!(fields[0].name(), "a");
5149 assert_eq!(fields[1].name(), "b");
5150 assert_eq!(fields[0].data_type(), &arrow::datatypes::DataType::Int64);
5151 assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
5152 }
5153 other => panic!("expected struct type, got {other:?}"),
5154 }
5155 }
5156}