1use std::collections::{HashMap, HashSet};
2use std::sync::{
3 Arc, OnceLock,
4 atomic::{AtomicBool, Ordering as AtomicOrdering},
5};
6
7use crate::SqlResult;
8use crate::SqlValue;
9use arrow::record_batch::RecordBatch;
10
11use llkv_executor::SelectExecution;
12use llkv_expr::literal::Literal;
13use llkv_plan::TransformFrame;
14use llkv_plan::validation::{
15 ensure_known_columns_case_insensitive, ensure_non_empty, ensure_unique_case_insensitive,
16};
17use llkv_result::Error;
18use llkv_runtime::TEMPORARY_NAMESPACE_ID;
19use llkv_runtime::{
20 AggregateExpr, AssignmentValue, ColumnAssignment, CreateIndexPlan, CreateTablePlan,
21 CreateTableSource, DeletePlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertPlan,
22 InsertSource, MultiColumnUniqueSpec, OrderByPlan, OrderSortType, OrderTarget, PlanColumnSpec,
23 PlanStatement, PlanValue, RenameTablePlan, RuntimeContext, RuntimeEngine, RuntimeSession,
24 RuntimeStatementResult, SelectPlan, SelectProjection, TruncatePlan, UpdatePlan,
25 extract_rows_from_range,
26};
27use llkv_storage::pager::Pager;
28use llkv_table::CatalogDdl;
29use llkv_table::catalog::{IdentifierContext, IdentifierResolver};
30use regex::Regex;
31use simd_r_drive_entry_handle::EntryHandle;
32use sqlparser::ast::{
33 AlterColumnOperation, AlterTableOperation, Assignment, AssignmentTarget, BeginTransactionKind,
34 BinaryOperator, ColumnOption, ColumnOptionDef, ConstraintCharacteristics,
35 DataType as SqlDataType, Delete, Distinct, ExceptionWhen, Expr as SqlExpr, FromTable,
36 FunctionArg, FunctionArgExpr, FunctionArguments, GroupByExpr, Ident, JoinConstraint,
37 JoinOperator, LimitClause, NullsDistinctOption, ObjectName, ObjectNamePart, ObjectType,
38 OrderBy, OrderByKind, Query, ReferentialAction, SchemaName, Select, SelectItem,
39 SelectItemQualifiedWildcardKind, Set, SetExpr, SqlOption, Statement, TableConstraint,
40 TableFactor, TableObject, TableWithJoins, TransactionMode, TransactionModifier, UnaryOperator,
41 UpdateTableFromKind, Value, ValueWithSpan,
42};
43use sqlparser::dialect::GenericDialect;
44use sqlparser::parser::Parser;
45use sqlparser::tokenizer::Span;
46
47const PARSER_RECURSION_LIMIT: usize = 200;
55
56pub struct SqlEngine<P>
102where
103 P: Pager<Blob = EntryHandle> + Send + Sync,
104{
105 engine: RuntimeEngine<P>,
106 default_nulls_first: AtomicBool,
107}
108
109const DROPPED_TABLE_TRANSACTION_ERR: &str = "another transaction has dropped this table";
110
111impl<P> Clone for SqlEngine<P>
112where
113 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
114{
115 fn clone(&self) -> Self {
116 tracing::warn!(
117 "[SQL_ENGINE] SqlEngine::clone() called - will create new Engine with new session!"
118 );
119 Self {
121 engine: self.engine.clone(),
122 default_nulls_first: AtomicBool::new(
123 self.default_nulls_first.load(AtomicOrdering::Relaxed),
124 ),
125 }
126 }
127}
128
129#[allow(dead_code)]
130impl<P> SqlEngine<P>
131where
132 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
133{
134 fn map_table_error(table_name: &str, err: Error) -> Error {
135 match err {
136 Error::NotFound => Self::table_not_found_error(table_name),
137 Error::InvalidArgumentError(msg) if msg.contains("unknown table") => {
138 Self::table_not_found_error(table_name)
139 }
140 other => other,
141 }
142 }
143
144 fn table_not_found_error(table_name: &str) -> Error {
145 Error::CatalogError(format!(
146 "Catalog Error: Table '{table_name}' does not exist"
147 ))
148 }
149
150 fn is_table_missing_error(err: &Error) -> bool {
151 match err {
152 Error::NotFound => true,
153 Error::CatalogError(msg) => {
154 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
155 }
156 Error::InvalidArgumentError(msg) => {
157 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
158 }
159 _ => false,
160 }
161 }
162
163 fn execute_plan_statement(
164 &self,
165 statement: PlanStatement,
166 ) -> SqlResult<RuntimeStatementResult<P>> {
167 let table = llkv_runtime::statement_table_name(&statement).map(str::to_string);
168 self.engine.execute_statement(statement).map_err(|err| {
169 if let Some(table_name) = table {
170 Self::map_table_error(&table_name, err)
171 } else {
172 err
173 }
174 })
175 }
176
177 pub fn new(pager: Arc<P>) -> Self {
178 let engine = RuntimeEngine::new(pager);
179 Self {
180 engine,
181 default_nulls_first: AtomicBool::new(false),
182 }
183 }
184
185 fn preprocess_create_type_syntax(sql: &str) -> String {
193 static CREATE_TYPE_REGEX: OnceLock<Regex> = OnceLock::new();
194 static DROP_TYPE_REGEX: OnceLock<Regex> = OnceLock::new();
195
196 let create_re = CREATE_TYPE_REGEX.get_or_init(|| {
198 Regex::new(r"(?i)\bCREATE\s+TYPE\s+").expect("valid CREATE TYPE regex")
199 });
200
201 let drop_re = DROP_TYPE_REGEX
203 .get_or_init(|| Regex::new(r"(?i)\bDROP\s+TYPE\s+").expect("valid DROP TYPE regex"));
204
205 let sql = create_re.replace_all(sql, "CREATE DOMAIN ").to_string();
207
208 drop_re.replace_all(&sql, "DROP DOMAIN ").to_string()
210 }
211
212 fn preprocess_exclude_syntax(sql: &str) -> String {
213 static EXCLUDE_REGEX: OnceLock<Regex> = OnceLock::new();
214
215 let re = EXCLUDE_REGEX.get_or_init(|| {
218 Regex::new(
219 r"(?i)EXCLUDE\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)+)\s*\)",
220 )
221 .expect("valid EXCLUDE qualifier regex")
222 });
223
224 re.replace_all(sql, |caps: ®ex::Captures| {
225 let qualified_name = &caps[1];
226 format!("EXCLUDE (\"{}\")", qualified_name)
227 })
228 .to_string()
229 }
230
231 fn preprocess_trailing_commas_in_values(sql: &str) -> String {
234 static TRAILING_COMMA_REGEX: OnceLock<Regex> = OnceLock::new();
235
236 let re = TRAILING_COMMA_REGEX
239 .get_or_init(|| Regex::new(r",(\s*)\)").expect("valid trailing comma regex"));
240
241 re.replace_all(sql, "$1)").to_string()
242 }
243
244 pub(crate) fn context_arc(&self) -> Arc<RuntimeContext<P>> {
245 self.engine.context()
246 }
247
248 pub fn with_context(context: Arc<RuntimeContext<P>>, default_nulls_first: bool) -> Self {
249 Self {
250 engine: RuntimeEngine::from_context(context),
251 default_nulls_first: AtomicBool::new(default_nulls_first),
252 }
253 }
254
255 #[cfg(test)]
256 fn default_nulls_first_for_tests(&self) -> bool {
257 self.default_nulls_first.load(AtomicOrdering::Relaxed)
258 }
259
260 fn has_active_transaction(&self) -> bool {
261 self.engine.session().has_active_transaction()
262 }
263
264 pub fn session(&self) -> &RuntimeSession<P> {
266 self.engine.session()
267 }
268
269 pub fn execute(&self, sql: &str) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
282 tracing::trace!("DEBUG SQL execute: {}", sql);
283
284 let processed_sql = Self::preprocess_create_type_syntax(sql);
286
287 let processed_sql = Self::preprocess_exclude_syntax(&processed_sql);
290
291 let processed_sql = Self::preprocess_trailing_commas_in_values(&processed_sql);
294
295 let dialect = GenericDialect {};
296 let statements = parse_sql_with_recursion_limit(&dialect, &processed_sql)
297 .map_err(|err| Error::InvalidArgumentError(format!("failed to parse SQL: {err}")))?;
298 tracing::trace!("DEBUG SQL execute: parsed {} statements", statements.len());
299
300 let mut results = Vec::with_capacity(statements.len());
301 for (i, statement) in statements.iter().enumerate() {
302 tracing::trace!("DEBUG SQL execute: processing statement {}", i);
303 results.push(self.execute_statement(statement.clone())?);
304 tracing::trace!("DEBUG SQL execute: statement {} completed", i);
305 }
306 tracing::trace!("DEBUG SQL execute completed successfully");
307 Ok(results)
308 }
309
310 pub fn sql(&self, sql: &str) -> SqlResult<Vec<RecordBatch>> {
347 let mut results = self.execute(sql)?;
348 if results.len() != 1 {
349 return Err(Error::InvalidArgumentError(
350 "SqlEngine::sql expects exactly one SQL statement".into(),
351 ));
352 }
353
354 match results.pop().expect("checked length above") {
355 RuntimeStatementResult::Select { execution, .. } => execution.collect(),
356 other => Err(Error::InvalidArgumentError(format!(
357 "SqlEngine::sql requires a SELECT statement, got {other:?}",
358 ))),
359 }
360 }
361
362 fn execute_statement(&self, statement: Statement) -> SqlResult<RuntimeStatementResult<P>> {
363 tracing::trace!(
364 "DEBUG SQL execute_statement: {:?}",
365 match &statement {
366 Statement::Insert(insert) =>
367 format!("Insert(table={:?})", Self::table_name_from_insert(insert)),
368 Statement::Query(_) => "Query".to_string(),
369 Statement::StartTransaction { .. } => "StartTransaction".to_string(),
370 Statement::Commit { .. } => "Commit".to_string(),
371 Statement::Rollback { .. } => "Rollback".to_string(),
372 Statement::CreateTable(_) => "CreateTable".to_string(),
373 Statement::Update { .. } => "Update".to_string(),
374 Statement::Delete(_) => "Delete".to_string(),
375 other => format!("Other({:?})", other),
376 }
377 );
378 match statement {
379 Statement::StartTransaction {
380 modes,
381 begin,
382 transaction,
383 modifier,
384 statements,
385 exception,
386 has_end_keyword,
387 } => self.handle_start_transaction(
388 modes,
389 begin,
390 transaction,
391 modifier,
392 statements,
393 exception,
394 has_end_keyword,
395 ),
396 Statement::Commit {
397 chain,
398 end,
399 modifier,
400 } => self.handle_commit(chain, end, modifier),
401 Statement::Rollback { chain, savepoint } => self.handle_rollback(chain, savepoint),
402 other => self.execute_statement_non_transactional(other),
403 }
404 }
405
406 fn execute_statement_non_transactional(
407 &self,
408 statement: Statement,
409 ) -> SqlResult<RuntimeStatementResult<P>> {
410 tracing::trace!("DEBUG SQL execute_statement_non_transactional called");
411 match statement {
412 Statement::CreateTable(stmt) => {
413 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateTable");
414 self.handle_create_table(stmt)
415 }
416 Statement::CreateIndex(stmt) => {
417 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateIndex");
418 self.handle_create_index(stmt)
419 }
420 Statement::CreateSchema {
421 schema_name,
422 if_not_exists,
423 with,
424 options,
425 default_collate_spec,
426 clone,
427 } => {
428 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateSchema");
429 self.handle_create_schema(
430 schema_name,
431 if_not_exists,
432 with,
433 options,
434 default_collate_spec,
435 clone,
436 )
437 }
438 Statement::CreateView {
439 name,
440 columns,
441 query,
442 materialized,
443 or_replace,
444 or_alter,
445 options,
446 cluster_by,
447 comment,
448 with_no_schema_binding,
449 if_not_exists,
450 temporary,
451 to,
452 params,
453 secure,
454 name_before_not_exists,
455 } => {
456 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateView");
457 self.handle_create_view(
458 name,
459 columns,
460 query,
461 materialized,
462 or_replace,
463 or_alter,
464 options,
465 cluster_by,
466 comment,
467 with_no_schema_binding,
468 if_not_exists,
469 temporary,
470 to,
471 params,
472 secure,
473 name_before_not_exists,
474 )
475 }
476 Statement::CreateDomain(create_domain) => {
477 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateDomain");
478 self.handle_create_domain(create_domain)
479 }
480 Statement::DropDomain(drop_domain) => {
481 tracing::trace!("DEBUG SQL execute_statement_non_transactional: DropDomain");
482 self.handle_drop_domain(drop_domain)
483 }
484 Statement::Insert(stmt) => {
485 let table_name =
486 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
487 tracing::trace!(
488 "DEBUG SQL execute_statement_non_transactional: Insert(table={})",
489 table_name
490 );
491 self.handle_insert(stmt)
492 }
493 Statement::Query(query) => {
494 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Query");
495 self.handle_query(*query)
496 }
497 Statement::Update {
498 table,
499 assignments,
500 from,
501 selection,
502 returning,
503 ..
504 } => {
505 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Update");
506 self.handle_update(table, assignments, from, selection, returning)
507 }
508 Statement::Delete(delete) => {
509 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Delete");
510 self.handle_delete(delete)
511 }
512 Statement::Truncate {
513 ref table_names,
514 ref partitions,
515 table,
516 ref identity,
517 cascade,
518 ref on_cluster,
519 } => {
520 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Truncate");
521 self.handle_truncate(
522 table_names,
523 partitions,
524 table,
525 identity,
526 cascade,
527 on_cluster,
528 )
529 }
530 Statement::Drop {
531 object_type,
532 if_exists,
533 names,
534 cascade,
535 restrict,
536 purge,
537 temporary,
538 ..
539 } => {
540 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Drop");
541 self.handle_drop(
542 object_type,
543 if_exists,
544 names,
545 cascade,
546 restrict,
547 purge,
548 temporary,
549 )
550 }
551 Statement::AlterTable {
552 name,
553 if_exists,
554 only,
555 operations,
556 ..
557 } => {
558 tracing::trace!("DEBUG SQL execute_statement_non_transactional: AlterTable");
559 self.handle_alter_table(name, if_exists, only, operations)
560 }
561 Statement::Set(set_stmt) => {
562 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Set");
563 self.handle_set(set_stmt)
564 }
565 Statement::Pragma { name, value, is_eq } => {
566 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Pragma");
567 self.handle_pragma(name, value, is_eq)
568 }
569 other => {
570 tracing::trace!(
571 "DEBUG SQL execute_statement_non_transactional: Other({:?})",
572 other
573 );
574 Err(Error::InvalidArgumentError(format!(
575 "unsupported SQL statement: {other:?}"
576 )))
577 }
578 }
579 }
580
581 fn table_name_from_insert(insert: &sqlparser::ast::Insert) -> SqlResult<String> {
582 match &insert.table {
583 TableObject::TableName(name) => Self::object_name_to_string(name),
584 _ => Err(Error::InvalidArgumentError(
585 "INSERT requires a plain table name".into(),
586 )),
587 }
588 }
589
590 fn table_name_from_update(table: &TableWithJoins) -> SqlResult<Option<String>> {
591 if !table.joins.is_empty() {
592 return Err(Error::InvalidArgumentError(
593 "UPDATE with JOIN targets is not supported yet".into(),
594 ));
595 }
596 Self::table_with_joins_name(table)
597 }
598
599 fn table_name_from_delete(delete: &Delete) -> SqlResult<Option<String>> {
600 if !delete.tables.is_empty() {
601 return Err(Error::InvalidArgumentError(
602 "multi-table DELETE is not supported yet".into(),
603 ));
604 }
605 let from_tables = match &delete.from {
606 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
607 };
608 if from_tables.is_empty() {
609 return Ok(None);
610 }
611 if from_tables.len() != 1 {
612 return Err(Error::InvalidArgumentError(
613 "DELETE over multiple tables is not supported yet".into(),
614 ));
615 }
616 Self::table_with_joins_name(&from_tables[0])
617 }
618
619 fn object_name_to_string(name: &ObjectName) -> SqlResult<String> {
620 let (display, _) = canonical_object_name(name)?;
621 Ok(display)
622 }
623
624 #[allow(dead_code)]
625 fn table_object_to_name(table: &TableObject) -> SqlResult<Option<String>> {
626 match table {
627 TableObject::TableName(name) => Ok(Some(Self::object_name_to_string(name)?)),
628 TableObject::TableFunction(_) => Ok(None),
629 }
630 }
631
632 fn table_with_joins_name(table: &TableWithJoins) -> SqlResult<Option<String>> {
633 match &table.relation {
634 TableFactor::Table { name, .. } => Ok(Some(Self::object_name_to_string(name)?)),
635 _ => Ok(None),
636 }
637 }
638
639 fn tables_in_query(query: &Query) -> SqlResult<Vec<String>> {
640 let mut tables = Vec::new();
641 if let sqlparser::ast::SetExpr::Select(select) = query.body.as_ref() {
642 for table in &select.from {
643 if let TableFactor::Table { name, .. } = &table.relation {
644 tables.push(Self::object_name_to_string(name)?);
645 }
646 }
647 }
648 Ok(tables)
649 }
650
651 fn collect_known_columns(
652 &self,
653 display_name: &str,
654 canonical_name: &str,
655 ) -> SqlResult<HashSet<String>> {
656 let context = self.engine.context();
657
658 if context.is_table_marked_dropped(canonical_name) {
659 return Err(Self::table_not_found_error(display_name));
660 }
661
662 if let Some(specs) = self
664 .engine
665 .session()
666 .table_column_specs_from_transaction(canonical_name)
667 {
668 return Ok(specs
669 .into_iter()
670 .map(|spec| spec.name.to_ascii_lowercase())
671 .collect());
672 }
673
674 let (_, canonical_name) = llkv_table::canonical_table_name(display_name)
676 .map_err(|e| arrow::error::ArrowError::ExternalError(Box::new(e)))?;
677 match context.catalog().table_column_specs(&canonical_name) {
678 Ok(specs) => Ok(specs
679 .into_iter()
680 .map(|spec| spec.name.to_ascii_lowercase())
681 .collect()),
682 Err(err) => {
683 if !Self::is_table_missing_error(&err) {
684 return Err(Self::map_table_error(display_name, err));
685 }
686
687 Ok(HashSet::new())
688 }
689 }
690 }
691
692 fn is_table_marked_dropped(&self, table_name: &str) -> SqlResult<bool> {
693 let canonical = table_name.to_ascii_lowercase();
694 Ok(self.engine.context().is_table_marked_dropped(&canonical))
695 }
696
697 fn handle_create_table(
698 &self,
699 mut stmt: sqlparser::ast::CreateTable,
700 ) -> SqlResult<RuntimeStatementResult<P>> {
701 validate_create_table_common(&stmt)?;
702
703 let (mut schema_name, table_name) = parse_schema_qualified_name(&stmt.name)?;
704
705 let namespace = if stmt.temporary {
706 if schema_name.is_some() {
707 return Err(Error::InvalidArgumentError(
708 "temporary tables cannot specify an explicit schema".into(),
709 ));
710 }
711 schema_name = None;
712 Some(TEMPORARY_NAMESPACE_ID.to_string())
713 } else {
714 None
715 };
716
717 if let Some(ref schema) = schema_name {
719 let catalog = self.engine.context().table_catalog();
720 if !catalog.schema_exists(schema) {
721 return Err(Error::CatalogError(format!(
722 "Schema '{}' does not exist",
723 schema
724 )));
725 }
726 }
727
728 let display_name = match &schema_name {
730 Some(schema) => format!("{}.{}", schema, table_name),
731 None => table_name.clone(),
732 };
733 let canonical_name = display_name.to_ascii_lowercase();
734 tracing::trace!(
735 "\n=== HANDLE_CREATE_TABLE: table='{}' columns={} ===",
736 display_name,
737 stmt.columns.len()
738 );
739 if display_name.is_empty() {
740 return Err(Error::InvalidArgumentError(
741 "table name must not be empty".into(),
742 ));
743 }
744
745 if let Some(query) = stmt.query.take() {
746 validate_create_table_as(&stmt)?;
747 if let Some(result) = self.try_handle_range_ctas(
748 &display_name,
749 &canonical_name,
750 &query,
751 stmt.if_not_exists,
752 stmt.or_replace,
753 namespace.clone(),
754 )? {
755 return Ok(result);
756 }
757 return self.handle_create_table_as(
758 display_name,
759 canonical_name,
760 *query,
761 stmt.if_not_exists,
762 stmt.or_replace,
763 namespace.clone(),
764 );
765 }
766
767 if stmt.columns.is_empty() {
768 return Err(Error::InvalidArgumentError(
769 "CREATE TABLE requires at least one column".into(),
770 ));
771 }
772
773 validate_create_table_definition(&stmt)?;
774
775 let column_defs_ast = std::mem::take(&mut stmt.columns);
776 let constraints = std::mem::take(&mut stmt.constraints);
777
778 let column_names: Vec<String> = column_defs_ast
779 .iter()
780 .map(|column_def| column_def.name.value.clone())
781 .collect();
782 ensure_unique_case_insensitive(column_names.iter().map(|name| name.as_str()), |dup| {
783 format!(
784 "duplicate column name '{}' in table '{}'",
785 dup, display_name
786 )
787 })?;
788 let column_names_lower: HashSet<String> = column_names
789 .iter()
790 .map(|name| name.to_ascii_lowercase())
791 .collect();
792
793 let mut columns: Vec<PlanColumnSpec> = Vec::with_capacity(column_defs_ast.len());
794 let mut primary_key_columns: HashSet<String> = HashSet::new();
795 let mut foreign_keys: Vec<ForeignKeySpec> = Vec::new();
796 let mut multi_column_uniques: Vec<MultiColumnUniqueSpec> = Vec::new();
797
798 for column_def in column_defs_ast {
800 let is_nullable = column_def
801 .options
802 .iter()
803 .all(|opt| !matches!(opt.option, ColumnOption::NotNull));
804
805 let is_primary_key = column_def.options.iter().any(|opt| {
806 matches!(
807 opt.option,
808 ColumnOption::Unique {
809 is_primary: true,
810 characteristics: _
811 }
812 )
813 });
814
815 let has_unique_constraint = column_def
816 .options
817 .iter()
818 .any(|opt| matches!(opt.option, ColumnOption::Unique { .. }));
819
820 let check_expr = column_def.options.iter().find_map(|opt| {
822 if let ColumnOption::Check(expr) = &opt.option {
823 Some(expr)
824 } else {
825 None
826 }
827 });
828
829 if let Some(check_expr) = check_expr {
831 let all_col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
832 validate_check_constraint(check_expr, &display_name, &all_col_refs)?;
833 }
834
835 let check_expr_str = check_expr.map(|e| e.to_string());
836
837 for opt in &column_def.options {
839 if let ColumnOption::ForeignKey {
840 foreign_table,
841 referred_columns,
842 on_delete,
843 on_update,
844 characteristics,
845 } = &opt.option
846 {
847 let spec = self.build_foreign_key_spec(
848 &display_name,
849 &canonical_name,
850 vec![column_def.name.value.clone()],
851 foreign_table,
852 referred_columns,
853 *on_delete,
854 *on_update,
855 characteristics,
856 &column_names_lower,
857 None,
858 )?;
859 foreign_keys.push(spec);
860 }
861 }
862
863 tracing::trace!(
864 "DEBUG CREATE TABLE column '{}' is_primary_key={} has_unique={} check_expr={:?}",
865 column_def.name.value,
866 is_primary_key,
867 has_unique_constraint,
868 check_expr_str
869 );
870
871 let resolved_data_type = self.engine.context().resolve_type(&column_def.data_type);
873
874 let mut column = PlanColumnSpec::new(
875 column_def.name.value.clone(),
876 arrow_type_from_sql(&resolved_data_type)?,
877 is_nullable,
878 );
879 tracing::trace!(
880 "DEBUG PlanColumnSpec after new(): primary_key={} unique={}",
881 column.primary_key,
882 column.unique
883 );
884
885 column = column
886 .with_primary_key(is_primary_key)
887 .with_unique(has_unique_constraint)
888 .with_check(check_expr_str);
889
890 if is_primary_key {
891 column.nullable = false;
892 primary_key_columns.insert(column.name.to_ascii_lowercase());
893 }
894 tracing::trace!(
895 "DEBUG PlanColumnSpec after with_primary_key({})/with_unique({}): primary_key={} unique={} check_expr={:?}",
896 is_primary_key,
897 has_unique_constraint,
898 column.primary_key,
899 column.unique,
900 column.check_expr
901 );
902
903 columns.push(column);
904 }
905
906 if !constraints.is_empty() {
908 let mut column_lookup: HashMap<String, usize> = HashMap::with_capacity(columns.len());
909 for (idx, column) in columns.iter().enumerate() {
910 column_lookup.insert(column.name.to_ascii_lowercase(), idx);
911 }
912
913 for constraint in constraints {
914 match constraint {
915 TableConstraint::PrimaryKey {
916 columns: constraint_columns,
917 ..
918 } => {
919 if !primary_key_columns.is_empty() {
920 return Err(Error::InvalidArgumentError(
921 "multiple PRIMARY KEY constraints are not supported".into(),
922 ));
923 }
924
925 ensure_non_empty(&constraint_columns, || {
926 "PRIMARY KEY requires at least one column".into()
927 })?;
928
929 let mut pk_column_names: Vec<String> =
930 Vec::with_capacity(constraint_columns.len());
931
932 for index_col in &constraint_columns {
933 let column_ident = extract_index_column_name(
934 index_col,
935 "PRIMARY KEY",
936 false, false, )?;
939 pk_column_names.push(column_ident);
940 }
941
942 ensure_unique_case_insensitive(
943 pk_column_names.iter().map(|name| name.as_str()),
944 |dup| format!("duplicate column '{}' in PRIMARY KEY constraint", dup),
945 )?;
946
947 ensure_known_columns_case_insensitive(
948 pk_column_names.iter().map(|name| name.as_str()),
949 &column_names_lower,
950 |unknown| {
951 format!("unknown column '{}' in PRIMARY KEY constraint", unknown)
952 },
953 )?;
954
955 for column_ident in pk_column_names {
956 let normalized = column_ident.to_ascii_lowercase();
957 let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
958 Error::InvalidArgumentError(format!(
959 "unknown column '{}' in PRIMARY KEY constraint",
960 column_ident
961 ))
962 })?;
963
964 let column = columns.get_mut(idx).expect("column index valid");
965 column.primary_key = true;
966 column.unique = true;
967 column.nullable = false;
968
969 primary_key_columns.insert(normalized);
970 }
971 }
972 TableConstraint::Unique {
973 columns: constraint_columns,
974 index_type,
975 index_options,
976 characteristics,
977 nulls_distinct,
978 name,
979 ..
980 } => {
981 if !matches!(nulls_distinct, NullsDistinctOption::None) {
982 return Err(Error::InvalidArgumentError(
983 "UNIQUE constraints with NULLS DISTINCT/NOT DISTINCT are not supported yet".into(),
984 ));
985 }
986
987 if index_type.is_some() {
988 return Err(Error::InvalidArgumentError(
989 "UNIQUE constraints with index types are not supported yet".into(),
990 ));
991 }
992
993 if !index_options.is_empty() {
994 return Err(Error::InvalidArgumentError(
995 "UNIQUE constraints with index options are not supported yet"
996 .into(),
997 ));
998 }
999
1000 if characteristics.is_some() {
1001 return Err(Error::InvalidArgumentError(
1002 "UNIQUE constraint characteristics are not supported yet".into(),
1003 ));
1004 }
1005
1006 ensure_non_empty(&constraint_columns, || {
1007 "UNIQUE constraint requires at least one column".into()
1008 })?;
1009
1010 let mut unique_column_names: Vec<String> =
1011 Vec::with_capacity(constraint_columns.len());
1012
1013 for index_column in &constraint_columns {
1014 let column_ident = extract_index_column_name(
1015 index_column,
1016 "UNIQUE constraint",
1017 false, false, )?;
1020 unique_column_names.push(column_ident);
1021 }
1022
1023 ensure_unique_case_insensitive(
1024 unique_column_names.iter().map(|name| name.as_str()),
1025 |dup| format!("duplicate column '{}' in UNIQUE constraint", dup),
1026 )?;
1027
1028 ensure_known_columns_case_insensitive(
1029 unique_column_names.iter().map(|name| name.as_str()),
1030 &column_names_lower,
1031 |unknown| format!("unknown column '{}' in UNIQUE constraint", unknown),
1032 )?;
1033
1034 if unique_column_names.len() > 1 {
1035 multi_column_uniques.push(MultiColumnUniqueSpec {
1037 name: name.map(|n| n.value),
1038 columns: unique_column_names,
1039 });
1040 } else {
1041 let column_ident = unique_column_names
1043 .into_iter()
1044 .next()
1045 .expect("unique constraint checked for emptiness");
1046 let normalized = column_ident.to_ascii_lowercase();
1047 let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
1048 Error::InvalidArgumentError(format!(
1049 "unknown column '{}' in UNIQUE constraint",
1050 column_ident
1051 ))
1052 })?;
1053
1054 let column = columns
1055 .get_mut(idx)
1056 .expect("column index from lookup must be valid");
1057 column.unique = true;
1058 }
1059 }
1060 TableConstraint::ForeignKey {
1061 name,
1062 index_name,
1063 columns: fk_columns,
1064 foreign_table,
1065 referred_columns,
1066 on_delete,
1067 on_update,
1068 characteristics,
1069 ..
1070 } => {
1071 if index_name.is_some() {
1072 return Err(Error::InvalidArgumentError(
1073 "FOREIGN KEY index clauses are not supported yet".into(),
1074 ));
1075 }
1076
1077 let referencing_columns: Vec<String> =
1078 fk_columns.into_iter().map(|ident| ident.value).collect();
1079 let spec = self.build_foreign_key_spec(
1080 &display_name,
1081 &canonical_name,
1082 referencing_columns,
1083 &foreign_table,
1084 &referred_columns,
1085 on_delete,
1086 on_update,
1087 &characteristics,
1088 &column_names_lower,
1089 name.map(|ident| ident.value),
1090 )?;
1091
1092 foreign_keys.push(spec);
1093 }
1094 unsupported => {
1095 return Err(Error::InvalidArgumentError(format!(
1096 "table-level constraint {:?} is not supported",
1097 unsupported
1098 )));
1099 }
1100 }
1101 }
1102 }
1103
1104 let plan = CreateTablePlan {
1105 name: display_name,
1106 if_not_exists: stmt.if_not_exists,
1107 or_replace: stmt.or_replace,
1108 columns,
1109 source: None,
1110 namespace,
1111 foreign_keys,
1112 multi_column_uniques,
1113 };
1114 self.execute_plan_statement(PlanStatement::CreateTable(plan))
1115 }
1116
1117 fn handle_create_index(
1118 &self,
1119 stmt: sqlparser::ast::CreateIndex,
1120 ) -> SqlResult<RuntimeStatementResult<P>> {
1121 let sqlparser::ast::CreateIndex {
1122 name,
1123 table_name,
1124 using,
1125 columns,
1126 unique,
1127 concurrently,
1128 if_not_exists,
1129 include,
1130 nulls_distinct,
1131 with,
1132 predicate,
1133 index_options,
1134 alter_options,
1135 ..
1136 } = stmt;
1137
1138 if concurrently {
1139 return Err(Error::InvalidArgumentError(
1140 "CREATE INDEX CONCURRENTLY is not supported".into(),
1141 ));
1142 }
1143 if using.is_some() {
1144 return Err(Error::InvalidArgumentError(
1145 "CREATE INDEX USING clauses are not supported".into(),
1146 ));
1147 }
1148 if !include.is_empty() {
1149 return Err(Error::InvalidArgumentError(
1150 "CREATE INDEX INCLUDE columns are not supported".into(),
1151 ));
1152 }
1153 if nulls_distinct.is_some() {
1154 return Err(Error::InvalidArgumentError(
1155 "CREATE INDEX NULLS DISTINCT is not supported".into(),
1156 ));
1157 }
1158 if !with.is_empty() {
1159 return Err(Error::InvalidArgumentError(
1160 "CREATE INDEX WITH options are not supported".into(),
1161 ));
1162 }
1163 if predicate.is_some() {
1164 return Err(Error::InvalidArgumentError(
1165 "partial CREATE INDEX is not supported".into(),
1166 ));
1167 }
1168 if !index_options.is_empty() {
1169 return Err(Error::InvalidArgumentError(
1170 "CREATE INDEX options are not supported".into(),
1171 ));
1172 }
1173 if !alter_options.is_empty() {
1174 return Err(Error::InvalidArgumentError(
1175 "CREATE INDEX ALTER options are not supported".into(),
1176 ));
1177 }
1178 if columns.is_empty() {
1179 return Err(Error::InvalidArgumentError(
1180 "CREATE INDEX requires at least one column".into(),
1181 ));
1182 }
1183
1184 let (schema_name, base_table_name) = parse_schema_qualified_name(&table_name)?;
1185 if let Some(ref schema) = schema_name {
1186 let catalog = self.engine.context().table_catalog();
1187 if !catalog.schema_exists(schema) {
1188 return Err(Error::CatalogError(format!(
1189 "Schema '{}' does not exist",
1190 schema
1191 )));
1192 }
1193 }
1194
1195 let display_table_name = schema_name
1196 .as_ref()
1197 .map(|schema| format!("{}.{}", schema, base_table_name))
1198 .unwrap_or_else(|| base_table_name.clone());
1199 let canonical_table_name = display_table_name.to_ascii_lowercase();
1200
1201 let known_columns =
1202 self.collect_known_columns(&display_table_name, &canonical_table_name)?;
1203 let enforce_known_columns = !known_columns.is_empty();
1204
1205 let index_name = match name {
1206 Some(name_obj) => Some(Self::object_name_to_string(&name_obj)?),
1207 None => None,
1208 };
1209
1210 let mut index_columns: Vec<IndexColumnPlan> = Vec::with_capacity(columns.len());
1211 let mut seen_column_names: HashSet<String> = HashSet::new();
1212 for item in columns {
1213 if item.column.with_fill.is_some() {
1215 return Err(Error::InvalidArgumentError(
1216 "CREATE INDEX column WITH FILL is not supported".into(),
1217 ));
1218 }
1219
1220 let column_name = extract_index_column_name(
1221 &item,
1222 "CREATE INDEX",
1223 true, true, )?;
1226
1227 let order_expr = &item.column;
1229 let ascending = order_expr.options.asc.unwrap_or(true);
1230 let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
1231
1232 let normalized = column_name.to_ascii_lowercase();
1233 if !seen_column_names.insert(normalized.clone()) {
1234 return Err(Error::InvalidArgumentError(format!(
1235 "duplicate column '{}' in CREATE INDEX",
1236 column_name
1237 )));
1238 }
1239
1240 if enforce_known_columns && !known_columns.contains(&normalized) {
1241 return Err(Error::InvalidArgumentError(format!(
1242 "column '{}' does not exist in table '{}'",
1243 column_name, display_table_name
1244 )));
1245 }
1246
1247 let column_plan = IndexColumnPlan::new(column_name).with_sort(ascending, nulls_first);
1248 index_columns.push(column_plan);
1249 }
1250
1251 let plan = CreateIndexPlan::new(display_table_name)
1252 .with_name(index_name)
1253 .with_unique(unique)
1254 .with_if_not_exists(if_not_exists)
1255 .with_columns(index_columns);
1256
1257 self.execute_plan_statement(PlanStatement::CreateIndex(plan))
1258 }
1259
1260 fn map_referential_action(
1261 action: Option<ReferentialAction>,
1262 kind: &str,
1263 ) -> SqlResult<ForeignKeyAction> {
1264 match action {
1265 None | Some(ReferentialAction::NoAction) => Ok(ForeignKeyAction::NoAction),
1266 Some(ReferentialAction::Restrict) => Ok(ForeignKeyAction::Restrict),
1267 Some(other) => Err(Error::InvalidArgumentError(format!(
1268 "FOREIGN KEY ON {kind} {:?} is not supported yet",
1269 other
1270 ))),
1271 }
1272 }
1273
1274 #[allow(clippy::too_many_arguments)]
1275 fn build_foreign_key_spec(
1276 &self,
1277 _referencing_display: &str,
1278 referencing_canonical: &str,
1279 referencing_columns: Vec<String>,
1280 foreign_table: &ObjectName,
1281 referenced_columns: &[Ident],
1282 on_delete: Option<ReferentialAction>,
1283 on_update: Option<ReferentialAction>,
1284 characteristics: &Option<ConstraintCharacteristics>,
1285 known_columns_lower: &HashSet<String>,
1286 name: Option<String>,
1287 ) -> SqlResult<ForeignKeySpec> {
1288 if characteristics.is_some() {
1289 return Err(Error::InvalidArgumentError(
1290 "FOREIGN KEY constraint characteristics are not supported yet".into(),
1291 ));
1292 }
1293
1294 ensure_non_empty(&referencing_columns, || {
1295 "FOREIGN KEY constraint requires at least one referencing column".into()
1296 })?;
1297 ensure_unique_case_insensitive(
1298 referencing_columns.iter().map(|name| name.as_str()),
1299 |dup| format!("duplicate column '{}' in FOREIGN KEY constraint", dup),
1300 )?;
1301 ensure_known_columns_case_insensitive(
1302 referencing_columns.iter().map(|name| name.as_str()),
1303 known_columns_lower,
1304 |unknown| format!("unknown column '{}' in FOREIGN KEY constraint", unknown),
1305 )?;
1306
1307 let referenced_columns_vec: Vec<String> = referenced_columns
1308 .iter()
1309 .map(|ident| ident.value.clone())
1310 .collect();
1311 ensure_unique_case_insensitive(
1312 referenced_columns_vec.iter().map(|name| name.as_str()),
1313 |dup| {
1314 format!(
1315 "duplicate referenced column '{}' in FOREIGN KEY constraint",
1316 dup
1317 )
1318 },
1319 )?;
1320
1321 if !referenced_columns_vec.is_empty()
1322 && referenced_columns_vec.len() != referencing_columns.len()
1323 {
1324 return Err(Error::InvalidArgumentError(
1325 "FOREIGN KEY referencing and referenced column counts must match".into(),
1326 ));
1327 }
1328
1329 let (referenced_display, referenced_canonical) = canonical_object_name(foreign_table)?;
1330
1331 let catalog = self.engine.context().table_catalog();
1333 if let Some(table_id) = catalog.table_id(&referenced_canonical) {
1334 let context = self.engine.context();
1335 if context.is_view(table_id)? {
1336 return Err(Error::CatalogError(format!(
1337 "Binder Error: cannot reference a VIEW with a FOREIGN KEY: {}",
1338 referenced_display
1339 )));
1340 }
1341 }
1342
1343 if referenced_canonical == referencing_canonical {
1344 ensure_known_columns_case_insensitive(
1345 referenced_columns_vec.iter().map(|name| name.as_str()),
1346 known_columns_lower,
1347 |unknown| {
1348 format!(
1349 "Binder Error: table '{}' does not have a column named '{}'",
1350 referenced_display, unknown
1351 )
1352 },
1353 )?;
1354 } else {
1355 let known_columns =
1356 self.collect_known_columns(&referenced_display, &referenced_canonical)?;
1357 if !known_columns.is_empty() {
1358 ensure_known_columns_case_insensitive(
1359 referenced_columns_vec.iter().map(|name| name.as_str()),
1360 &known_columns,
1361 |unknown| {
1362 format!(
1363 "Binder Error: table '{}' does not have a column named '{}'",
1364 referenced_display, unknown
1365 )
1366 },
1367 )?;
1368 }
1369 }
1370
1371 let on_delete_action = Self::map_referential_action(on_delete, "DELETE")?;
1372 let on_update_action = Self::map_referential_action(on_update, "UPDATE")?;
1373
1374 Ok(ForeignKeySpec {
1375 name,
1376 columns: referencing_columns,
1377 referenced_table: referenced_display,
1378 referenced_columns: referenced_columns_vec,
1379 on_delete: on_delete_action,
1380 on_update: on_update_action,
1381 })
1382 }
1383
1384 fn handle_create_schema(
1385 &self,
1386 schema_name: SchemaName,
1387 _if_not_exists: bool,
1388 with: Option<Vec<SqlOption>>,
1389 options: Option<Vec<SqlOption>>,
1390 default_collate_spec: Option<SqlExpr>,
1391 clone: Option<ObjectName>,
1392 ) -> SqlResult<RuntimeStatementResult<P>> {
1393 if clone.is_some() {
1394 return Err(Error::InvalidArgumentError(
1395 "CREATE SCHEMA ... CLONE is not supported".into(),
1396 ));
1397 }
1398 if with.as_ref().is_some_and(|opts| !opts.is_empty()) {
1399 return Err(Error::InvalidArgumentError(
1400 "CREATE SCHEMA ... WITH options are not supported".into(),
1401 ));
1402 }
1403 if options.as_ref().is_some_and(|opts| !opts.is_empty()) {
1404 return Err(Error::InvalidArgumentError(
1405 "CREATE SCHEMA options are not supported".into(),
1406 ));
1407 }
1408 if default_collate_spec.is_some() {
1409 return Err(Error::InvalidArgumentError(
1410 "CREATE SCHEMA DEFAULT COLLATE is not supported".into(),
1411 ));
1412 }
1413
1414 let schema_name = match schema_name {
1415 SchemaName::Simple(name) => name,
1416 _ => {
1417 return Err(Error::InvalidArgumentError(
1418 "CREATE SCHEMA authorization is not supported".into(),
1419 ));
1420 }
1421 };
1422
1423 let (display_name, canonical) = canonical_object_name(&schema_name)?;
1424 if display_name.is_empty() {
1425 return Err(Error::InvalidArgumentError(
1426 "schema name must not be empty".into(),
1427 ));
1428 }
1429
1430 let catalog = self.engine.context().table_catalog();
1432
1433 if _if_not_exists && catalog.schema_exists(&canonical) {
1434 return Ok(RuntimeStatementResult::NoOp);
1435 }
1436
1437 catalog.register_schema(&canonical).map_err(|err| {
1438 Error::CatalogError(format!(
1439 "Failed to create schema '{}': {}",
1440 display_name, err
1441 ))
1442 })?;
1443
1444 Ok(RuntimeStatementResult::NoOp)
1445 }
1446
1447 #[allow(clippy::too_many_arguments)]
1448 fn handle_create_view(
1449 &self,
1450 name: ObjectName,
1451 _columns: Vec<sqlparser::ast::ViewColumnDef>,
1452 query: Box<sqlparser::ast::Query>,
1453 materialized: bool,
1454 or_replace: bool,
1455 or_alter: bool,
1456 _options: sqlparser::ast::CreateTableOptions,
1457 _cluster_by: Vec<sqlparser::ast::Ident>,
1458 _comment: Option<String>,
1459 _with_no_schema_binding: bool,
1460 if_not_exists: bool,
1461 temporary: bool,
1462 _to: Option<ObjectName>,
1463 _params: Option<sqlparser::ast::CreateViewParams>,
1464 _secure: bool,
1465 _name_before_not_exists: bool,
1466 ) -> SqlResult<RuntimeStatementResult<P>> {
1467 if materialized {
1469 return Err(Error::InvalidArgumentError(
1470 "MATERIALIZED VIEWS are not supported".into(),
1471 ));
1472 }
1473 if or_replace {
1474 return Err(Error::InvalidArgumentError(
1475 "CREATE OR REPLACE VIEW is not supported".into(),
1476 ));
1477 }
1478 if or_alter {
1479 return Err(Error::InvalidArgumentError(
1480 "CREATE OR ALTER VIEW is not supported".into(),
1481 ));
1482 }
1483 if temporary {
1484 return Err(Error::InvalidArgumentError(
1485 "TEMPORARY VIEWS are not supported".into(),
1486 ));
1487 }
1488
1489 let (schema_name, view_name) = parse_schema_qualified_name(&name)?;
1491
1492 if let Some(ref schema) = schema_name {
1494 let catalog = self.engine.context().table_catalog();
1495 if !catalog.schema_exists(schema) {
1496 return Err(Error::CatalogError(format!(
1497 "Schema '{}' does not exist",
1498 schema
1499 )));
1500 }
1501 }
1502
1503 let display_name = match &schema_name {
1505 Some(schema) => format!("{}.{}", schema, view_name),
1506 None => view_name.clone(),
1507 };
1508 let canonical_name = display_name.to_ascii_lowercase();
1509
1510 let catalog = self.engine.context().table_catalog();
1512 if catalog.table_exists(&canonical_name) {
1513 if if_not_exists {
1514 return Ok(RuntimeStatementResult::NoOp);
1515 }
1516 return Err(Error::CatalogError(format!(
1517 "Table or view '{}' already exists",
1518 display_name
1519 )));
1520 }
1521
1522 let view_definition = query.to_string();
1524
1525 let context = self.engine.context();
1527 context.create_view(&display_name, view_definition)?;
1528
1529 tracing::debug!("Created view: {}", display_name);
1530 Ok(RuntimeStatementResult::NoOp)
1531 }
1532
1533 fn handle_create_domain(
1534 &self,
1535 create_domain: sqlparser::ast::CreateDomain,
1536 ) -> SqlResult<RuntimeStatementResult<P>> {
1537 use llkv_table::CustomTypeMeta;
1538 use std::time::{SystemTime, UNIX_EPOCH};
1539
1540 let type_name = create_domain.name.to_string();
1542
1543 let base_type_sql = create_domain.data_type.to_string();
1545
1546 self.engine
1548 .context()
1549 .register_type(type_name.clone(), create_domain.data_type.clone());
1550
1551 let context = self.engine.context();
1553 let catalog = llkv_table::SysCatalog::new(context.store());
1554
1555 let created_at_micros = SystemTime::now()
1556 .duration_since(UNIX_EPOCH)
1557 .unwrap_or_default()
1558 .as_micros() as u64;
1559
1560 let meta = CustomTypeMeta {
1561 name: type_name.clone(),
1562 base_type_sql,
1563 created_at_micros,
1564 };
1565
1566 catalog.put_custom_type_meta(&meta)?;
1567
1568 tracing::debug!("Created and persisted type alias: {}", type_name);
1569 Ok(RuntimeStatementResult::NoOp)
1570 }
1571
1572 fn handle_drop_domain(
1573 &self,
1574 drop_domain: sqlparser::ast::DropDomain,
1575 ) -> SqlResult<RuntimeStatementResult<P>> {
1576 let if_exists = drop_domain.if_exists;
1577 let type_name = drop_domain.name.to_string();
1578
1579 let result = self.engine.context().drop_type(&type_name);
1581
1582 if let Err(err) = result {
1583 if !if_exists {
1584 return Err(err);
1585 }
1586 } else {
1588 let context = self.engine.context();
1590 let catalog = llkv_table::SysCatalog::new(context.store());
1591 catalog.delete_custom_type_meta(&type_name)?;
1592
1593 tracing::debug!("Dropped and removed from catalog type alias: {}", type_name);
1594 }
1595
1596 Ok(RuntimeStatementResult::NoOp)
1597 }
1598
1599 fn try_handle_range_ctas(
1600 &self,
1601 display_name: &str,
1602 _canonical_name: &str,
1603 query: &Query,
1604 if_not_exists: bool,
1605 or_replace: bool,
1606 namespace: Option<String>,
1607 ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1608 let select = match query.body.as_ref() {
1609 SetExpr::Select(select) => select,
1610 _ => return Ok(None),
1611 };
1612 if select.from.len() != 1 {
1613 return Ok(None);
1614 }
1615 let table_with_joins = &select.from[0];
1616 if !table_with_joins.joins.is_empty() {
1617 return Ok(None);
1618 }
1619 let (range_size, range_alias) = match &table_with_joins.relation {
1620 TableFactor::Table {
1621 name,
1622 args: Some(args),
1623 alias,
1624 ..
1625 } => {
1626 let func_name = name.to_string().to_ascii_lowercase();
1627 if func_name != "range" {
1628 return Ok(None);
1629 }
1630 if args.args.len() != 1 {
1631 return Err(Error::InvalidArgumentError(
1632 "range table function expects a single argument".into(),
1633 ));
1634 }
1635 let size_expr = &args.args[0];
1636 let range_size = match size_expr {
1637 FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1638 match &value.value {
1639 Value::Number(raw, _) => raw.parse::<i64>().map_err(|e| {
1640 Error::InvalidArgumentError(format!(
1641 "invalid range size literal {}: {}",
1642 raw, e
1643 ))
1644 })?,
1645 other => {
1646 return Err(Error::InvalidArgumentError(format!(
1647 "unsupported range size value: {:?}",
1648 other
1649 )));
1650 }
1651 }
1652 }
1653 _ => {
1654 return Err(Error::InvalidArgumentError(
1655 "unsupported range argument".into(),
1656 ));
1657 }
1658 };
1659 (range_size, alias.as_ref().map(|a| a.name.value.clone()))
1660 }
1661 _ => return Ok(None),
1662 };
1663
1664 if range_size < 0 {
1665 return Err(Error::InvalidArgumentError(
1666 "range size must be non-negative".into(),
1667 ));
1668 }
1669
1670 if select.projection.is_empty() {
1671 return Err(Error::InvalidArgumentError(
1672 "CREATE TABLE AS SELECT requires at least one projected column".into(),
1673 ));
1674 }
1675
1676 let mut column_specs = Vec::with_capacity(select.projection.len());
1677 let mut column_names = Vec::with_capacity(select.projection.len());
1678 let mut row_template = Vec::with_capacity(select.projection.len());
1679 for item in &select.projection {
1680 match item {
1681 SelectItem::ExprWithAlias { expr, alias } => {
1682 let (value, data_type) = match expr {
1683 SqlExpr::Value(value_with_span) => match &value_with_span.value {
1684 Value::Number(raw, _) => {
1685 let parsed = raw.parse::<i64>().map_err(|e| {
1686 Error::InvalidArgumentError(format!(
1687 "invalid numeric literal {}: {}",
1688 raw, e
1689 ))
1690 })?;
1691 (
1692 PlanValue::Integer(parsed),
1693 arrow::datatypes::DataType::Int64,
1694 )
1695 }
1696 Value::SingleQuotedString(s) => (
1697 PlanValue::String(s.clone()),
1698 arrow::datatypes::DataType::Utf8,
1699 ),
1700 other => {
1701 return Err(Error::InvalidArgumentError(format!(
1702 "unsupported SELECT expression in range CTAS: {:?}",
1703 other
1704 )));
1705 }
1706 },
1707 SqlExpr::Identifier(ident) => {
1708 let ident_lower = ident.value.to_ascii_lowercase();
1709 if range_alias
1710 .as_ref()
1711 .map(|a| a.eq_ignore_ascii_case(&ident_lower))
1712 .unwrap_or(false)
1713 || ident_lower == "range"
1714 {
1715 return Err(Error::InvalidArgumentError(
1716 "range() table function columns are not supported yet".into(),
1717 ));
1718 }
1719 return Err(Error::InvalidArgumentError(format!(
1720 "unsupported identifier '{}' in range CTAS projection",
1721 ident.value
1722 )));
1723 }
1724 other => {
1725 return Err(Error::InvalidArgumentError(format!(
1726 "unsupported SELECT expression in range CTAS: {:?}",
1727 other
1728 )));
1729 }
1730 };
1731 let column_name = alias.value.clone();
1732 column_specs.push(PlanColumnSpec::new(column_name.clone(), data_type, true));
1733 column_names.push(column_name);
1734 row_template.push(value);
1735 }
1736 other => {
1737 return Err(Error::InvalidArgumentError(format!(
1738 "unsupported projection {:?} in range CTAS",
1739 other
1740 )));
1741 }
1742 }
1743 }
1744
1745 let plan = CreateTablePlan {
1746 name: display_name.to_string(),
1747 if_not_exists,
1748 or_replace,
1749 columns: column_specs,
1750 source: None,
1751 namespace,
1752 foreign_keys: Vec::new(),
1753 multi_column_uniques: Vec::new(),
1754 };
1755 let create_result = self.execute_plan_statement(PlanStatement::CreateTable(plan))?;
1756
1757 let row_count = range_size
1758 .try_into()
1759 .map_err(|_| Error::InvalidArgumentError("range size exceeds usize".into()))?;
1760 if row_count > 0 {
1761 let rows = vec![row_template; row_count];
1762 let insert_plan = InsertPlan {
1763 table: display_name.to_string(),
1764 columns: column_names,
1765 source: InsertSource::Rows(rows),
1766 };
1767 self.execute_plan_statement(PlanStatement::Insert(insert_plan))?;
1768 }
1769
1770 Ok(Some(create_result))
1771 }
1772
1773 fn try_handle_pragma_table_info(
1777 &self,
1778 query: &Query,
1779 ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1780 let select = match query.body.as_ref() {
1781 SetExpr::Select(select) => select,
1782 _ => return Ok(None),
1783 };
1784
1785 if select.from.len() != 1 {
1786 return Ok(None);
1787 }
1788
1789 let table_with_joins = &select.from[0];
1790 if !table_with_joins.joins.is_empty() {
1791 return Ok(None);
1792 }
1793
1794 let table_name = match &table_with_joins.relation {
1796 TableFactor::Table {
1797 name,
1798 args: Some(args),
1799 ..
1800 } => {
1801 let func_name = name.to_string().to_ascii_lowercase();
1802 if func_name != "pragma_table_info" {
1803 return Ok(None);
1804 }
1805
1806 if args.args.len() != 1 {
1808 return Err(Error::InvalidArgumentError(
1809 "pragma_table_info expects exactly one argument".into(),
1810 ));
1811 }
1812
1813 match &args.args[0] {
1814 FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1815 match &value.value {
1816 Value::SingleQuotedString(s) => s.clone(),
1817 Value::DoubleQuotedString(s) => s.clone(),
1818 _ => {
1819 return Err(Error::InvalidArgumentError(
1820 "pragma_table_info argument must be a string".into(),
1821 ));
1822 }
1823 }
1824 }
1825 _ => {
1826 return Err(Error::InvalidArgumentError(
1827 "pragma_table_info argument must be a string literal".into(),
1828 ));
1829 }
1830 }
1831 }
1832 _ => return Ok(None),
1833 };
1834
1835 let context = self.engine.context();
1837 let (_, canonical_name) = llkv_table::canonical_table_name(&table_name)?;
1838 let columns = context.catalog().table_column_specs(&canonical_name)?;
1839
1840 use arrow::array::{BooleanArray, Int32Array, StringArray};
1842 use arrow::datatypes::{DataType, Field, Schema};
1843
1844 let mut cid_values = Vec::new();
1845 let mut name_values = Vec::new();
1846 let mut type_values = Vec::new();
1847 let mut notnull_values = Vec::new();
1848 let mut dflt_value_values: Vec<Option<String>> = Vec::new();
1849 let mut pk_values = Vec::new();
1850
1851 for (idx, col) in columns.iter().enumerate() {
1852 cid_values.push(idx as i32);
1853 name_values.push(col.name.clone());
1854 type_values.push(format!("{:?}", col.data_type)); notnull_values.push(!col.nullable);
1856 dflt_value_values.push(None); pk_values.push(col.primary_key);
1858 }
1859
1860 let schema = Arc::new(Schema::new(vec![
1861 Field::new("cid", DataType::Int32, false),
1862 Field::new("name", DataType::Utf8, false),
1863 Field::new("type", DataType::Utf8, false),
1864 Field::new("notnull", DataType::Boolean, false),
1865 Field::new("dflt_value", DataType::Utf8, true),
1866 Field::new("pk", DataType::Boolean, false),
1867 ]));
1868
1869 use arrow::array::ArrayRef;
1870 let mut batch = RecordBatch::try_new(
1871 Arc::clone(&schema),
1872 vec![
1873 Arc::new(Int32Array::from(cid_values)) as ArrayRef,
1874 Arc::new(StringArray::from(name_values)) as ArrayRef,
1875 Arc::new(StringArray::from(type_values)) as ArrayRef,
1876 Arc::new(BooleanArray::from(notnull_values)) as ArrayRef,
1877 Arc::new(StringArray::from(dflt_value_values)) as ArrayRef,
1878 Arc::new(BooleanArray::from(pk_values)) as ArrayRef,
1879 ],
1880 )
1881 .map_err(|e| Error::Internal(format!("failed to create pragma_table_info batch: {}", e)))?;
1882
1883 let projection_indices: Vec<usize> = select
1885 .projection
1886 .iter()
1887 .filter_map(|item| {
1888 match item {
1889 SelectItem::UnnamedExpr(SqlExpr::Identifier(ident)) => {
1890 schema.index_of(&ident.value).ok()
1891 }
1892 SelectItem::ExprWithAlias { expr, .. } => {
1893 if let SqlExpr::Identifier(ident) = expr {
1894 schema.index_of(&ident.value).ok()
1895 } else {
1896 None
1897 }
1898 }
1899 SelectItem::Wildcard(_) => None, _ => None,
1901 }
1902 })
1903 .collect();
1904
1905 let projected_schema;
1907 if !projection_indices.is_empty() {
1908 let projected_fields: Vec<Field> = projection_indices
1909 .iter()
1910 .map(|&idx| schema.field(idx).clone())
1911 .collect();
1912 projected_schema = Arc::new(Schema::new(projected_fields));
1913
1914 let projected_columns: Vec<ArrayRef> = projection_indices
1915 .iter()
1916 .map(|&idx| Arc::clone(batch.column(idx)))
1917 .collect();
1918
1919 batch = RecordBatch::try_new(Arc::clone(&projected_schema), projected_columns)
1920 .map_err(|e| Error::Internal(format!("failed to project columns: {}", e)))?;
1921 } else {
1922 projected_schema = schema;
1924 }
1925
1926 if let Some(order_by) = &query.order_by {
1928 use arrow::compute::SortColumn;
1929 use arrow::compute::lexsort_to_indices;
1930 use sqlparser::ast::OrderByKind;
1931
1932 let exprs = match &order_by.kind {
1933 OrderByKind::Expressions(exprs) => exprs,
1934 _ => {
1935 return Err(Error::InvalidArgumentError(
1936 "unsupported ORDER BY clause".into(),
1937 ));
1938 }
1939 };
1940
1941 let mut sort_columns = Vec::new();
1942 for order_expr in exprs {
1943 if let SqlExpr::Identifier(ident) = &order_expr.expr
1944 && let Ok(col_idx) = projected_schema.index_of(&ident.value)
1945 {
1946 let options = arrow::compute::SortOptions {
1947 descending: !order_expr.options.asc.unwrap_or(true),
1948 nulls_first: order_expr.options.nulls_first.unwrap_or(false),
1949 };
1950 sort_columns.push(SortColumn {
1951 values: Arc::clone(batch.column(col_idx)),
1952 options: Some(options),
1953 });
1954 }
1955 }
1956
1957 if !sort_columns.is_empty() {
1958 let indices = lexsort_to_indices(&sort_columns, None)
1959 .map_err(|e| Error::Internal(format!("failed to sort: {}", e)))?;
1960
1961 use arrow::compute::take;
1962 let sorted_columns: Result<Vec<ArrayRef>, _> = batch
1963 .columns()
1964 .iter()
1965 .map(|col| take(col.as_ref(), &indices, None))
1966 .collect();
1967
1968 batch = RecordBatch::try_new(
1969 Arc::clone(&projected_schema),
1970 sorted_columns
1971 .map_err(|e| Error::Internal(format!("failed to apply sort: {}", e)))?,
1972 )
1973 .map_err(|e| Error::Internal(format!("failed to create sorted batch: {}", e)))?;
1974 }
1975 }
1976
1977 let execution = SelectExecution::new_single_batch(
1978 table_name.clone(),
1979 Arc::clone(&projected_schema),
1980 batch,
1981 );
1982
1983 Ok(Some(RuntimeStatementResult::Select {
1984 table_name,
1985 schema: projected_schema,
1986 execution: Box::new(execution),
1987 }))
1988 }
1989
1990 fn handle_create_table_as(
1991 &self,
1992 display_name: String,
1993 _canonical_name: String,
1994 query: Query,
1995 if_not_exists: bool,
1996 or_replace: bool,
1997 namespace: Option<String>,
1998 ) -> SqlResult<RuntimeStatementResult<P>> {
1999 if let SetExpr::Select(select) = query.body.as_ref()
2002 && let Some((rows, column_names)) = extract_values_from_derived_table(&select.from)?
2003 {
2004 return self.handle_create_table_from_values(
2006 display_name,
2007 rows,
2008 column_names,
2009 if_not_exists,
2010 or_replace,
2011 namespace,
2012 );
2013 }
2014
2015 let select_plan = self.build_select_plan(query)?;
2017
2018 if select_plan.projections.is_empty() && select_plan.aggregates.is_empty() {
2019 return Err(Error::InvalidArgumentError(
2020 "CREATE TABLE AS SELECT requires at least one projected column".into(),
2021 ));
2022 }
2023
2024 let plan = CreateTablePlan {
2025 name: display_name,
2026 if_not_exists,
2027 or_replace,
2028 columns: Vec::new(),
2029 source: Some(CreateTableSource::Select {
2030 plan: Box::new(select_plan),
2031 }),
2032 namespace,
2033 foreign_keys: Vec::new(),
2034 multi_column_uniques: Vec::new(),
2035 };
2036 self.execute_plan_statement(PlanStatement::CreateTable(plan))
2037 }
2038
2039 fn handle_create_table_from_values(
2040 &self,
2041 display_name: String,
2042 rows: Vec<Vec<PlanValue>>,
2043 column_names: Vec<String>,
2044 if_not_exists: bool,
2045 or_replace: bool,
2046 namespace: Option<String>,
2047 ) -> SqlResult<RuntimeStatementResult<P>> {
2048 use arrow::array::{ArrayRef, Float64Builder, Int64Builder, StringBuilder};
2049 use arrow::datatypes::{DataType, Field, Schema};
2050 use arrow::record_batch::RecordBatch;
2051 use std::sync::Arc;
2052
2053 if rows.is_empty() {
2054 return Err(Error::InvalidArgumentError(
2055 "VALUES must have at least one row".into(),
2056 ));
2057 }
2058
2059 let num_cols = column_names.len();
2060
2061 let first_row = &rows[0];
2063 if first_row.len() != num_cols {
2064 return Err(Error::InvalidArgumentError(
2065 "VALUES row column count mismatch".into(),
2066 ));
2067 }
2068
2069 let mut fields = Vec::with_capacity(num_cols);
2070 let mut column_types = Vec::with_capacity(num_cols);
2071
2072 for (idx, value) in first_row.iter().enumerate() {
2073 let (data_type, nullable) = match value {
2074 PlanValue::Integer(_) => (DataType::Int64, false),
2075 PlanValue::Float(_) => (DataType::Float64, false),
2076 PlanValue::String(_) => (DataType::Utf8, false),
2077 PlanValue::Null => (DataType::Utf8, true), _ => {
2079 return Err(Error::InvalidArgumentError(format!(
2080 "unsupported value type in VALUES for column '{}'",
2081 column_names.get(idx).unwrap_or(&format!("column{}", idx))
2082 )));
2083 }
2084 };
2085
2086 column_types.push(data_type.clone());
2087 fields.push(Field::new(&column_names[idx], data_type, nullable));
2088 }
2089
2090 let schema = Arc::new(Schema::new(fields));
2091
2092 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(num_cols);
2094
2095 for col_idx in 0..num_cols {
2096 let col_type = &column_types[col_idx];
2097
2098 match col_type {
2099 DataType::Int64 => {
2100 let mut builder = Int64Builder::with_capacity(rows.len());
2101 for row in &rows {
2102 match &row[col_idx] {
2103 PlanValue::Integer(v) => builder.append_value(*v),
2104 PlanValue::Null => builder.append_null(),
2105 other => {
2106 return Err(Error::InvalidArgumentError(format!(
2107 "type mismatch in VALUES: expected Integer, got {:?}",
2108 other
2109 )));
2110 }
2111 }
2112 }
2113 arrays.push(Arc::new(builder.finish()) as ArrayRef);
2114 }
2115 DataType::Float64 => {
2116 let mut builder = Float64Builder::with_capacity(rows.len());
2117 for row in &rows {
2118 match &row[col_idx] {
2119 PlanValue::Float(v) => builder.append_value(*v),
2120 PlanValue::Null => builder.append_null(),
2121 other => {
2122 return Err(Error::InvalidArgumentError(format!(
2123 "type mismatch in VALUES: expected Float, got {:?}",
2124 other
2125 )));
2126 }
2127 }
2128 }
2129 arrays.push(Arc::new(builder.finish()) as ArrayRef);
2130 }
2131 DataType::Utf8 => {
2132 let mut builder = StringBuilder::with_capacity(rows.len(), 1024);
2133 for row in &rows {
2134 match &row[col_idx] {
2135 PlanValue::String(v) => builder.append_value(v),
2136 PlanValue::Null => builder.append_null(),
2137 other => {
2138 return Err(Error::InvalidArgumentError(format!(
2139 "type mismatch in VALUES: expected String, got {:?}",
2140 other
2141 )));
2142 }
2143 }
2144 }
2145 arrays.push(Arc::new(builder.finish()) as ArrayRef);
2146 }
2147 other => {
2148 return Err(Error::InvalidArgumentError(format!(
2149 "unsupported column type in VALUES: {:?}",
2150 other
2151 )));
2152 }
2153 }
2154 }
2155
2156 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays).map_err(|e| {
2157 Error::Internal(format!("failed to create RecordBatch from VALUES: {}", e))
2158 })?;
2159
2160 let plan = CreateTablePlan {
2161 name: display_name.clone(),
2162 if_not_exists,
2163 or_replace,
2164 columns: Vec::new(),
2165 source: Some(CreateTableSource::Batches {
2166 schema: Arc::clone(&schema),
2167 batches: vec![batch],
2168 }),
2169 namespace,
2170 foreign_keys: Vec::new(),
2171 multi_column_uniques: Vec::new(),
2172 };
2173
2174 self.execute_plan_statement(PlanStatement::CreateTable(plan))
2175 }
2176
2177 fn handle_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<RuntimeStatementResult<P>> {
2178 let table_name_debug =
2179 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
2180 tracing::trace!(
2181 "DEBUG SQL handle_insert called for table={}",
2182 table_name_debug
2183 );
2184 if !self.engine.session().has_active_transaction()
2185 && self.is_table_marked_dropped(&table_name_debug)?
2186 {
2187 return Err(Error::TransactionContextError(
2188 DROPPED_TABLE_TRANSACTION_ERR.into(),
2189 ));
2190 }
2191 if stmt.replace_into || stmt.ignore || stmt.or.is_some() {
2192 return Err(Error::InvalidArgumentError(
2193 "non-standard INSERT forms are not supported".into(),
2194 ));
2195 }
2196 if stmt.overwrite {
2197 return Err(Error::InvalidArgumentError(
2198 "INSERT OVERWRITE is not supported".into(),
2199 ));
2200 }
2201 if !stmt.assignments.is_empty() {
2202 return Err(Error::InvalidArgumentError(
2203 "INSERT ... SET is not supported".into(),
2204 ));
2205 }
2206 if stmt.partitioned.is_some() || !stmt.after_columns.is_empty() {
2207 return Err(Error::InvalidArgumentError(
2208 "partitioned INSERT is not supported".into(),
2209 ));
2210 }
2211 if stmt.returning.is_some() {
2212 return Err(Error::InvalidArgumentError(
2213 "INSERT ... RETURNING is not supported".into(),
2214 ));
2215 }
2216 if stmt.format_clause.is_some() || stmt.settings.is_some() {
2217 return Err(Error::InvalidArgumentError(
2218 "INSERT with FORMAT or SETTINGS is not supported".into(),
2219 ));
2220 }
2221
2222 let (display_name, _canonical_name) = match &stmt.table {
2223 TableObject::TableName(name) => canonical_object_name(name)?,
2224 _ => {
2225 return Err(Error::InvalidArgumentError(
2226 "INSERT requires a plain table name".into(),
2227 ));
2228 }
2229 };
2230
2231 let columns: Vec<String> = stmt
2232 .columns
2233 .iter()
2234 .map(|ident| ident.value.clone())
2235 .collect();
2236 let source_expr = stmt
2237 .source
2238 .as_ref()
2239 .ok_or_else(|| Error::InvalidArgumentError("INSERT requires a VALUES clause".into()))?;
2240 validate_simple_query(source_expr)?;
2241
2242 let insert_source = match source_expr.body.as_ref() {
2243 SetExpr::Values(values) => {
2244 if values.rows.is_empty() {
2245 return Err(Error::InvalidArgumentError(
2246 "INSERT VALUES list must contain at least one row".into(),
2247 ));
2248 }
2249 let mut rows: Vec<Vec<SqlValue>> = Vec::with_capacity(values.rows.len());
2250 for row in &values.rows {
2251 let mut converted = Vec::with_capacity(row.len());
2252 for expr in row {
2253 converted.push(SqlValue::try_from_expr(expr)?);
2254 }
2255 rows.push(converted);
2256 }
2257 InsertSource::Rows(
2258 rows.into_iter()
2259 .map(|row| row.into_iter().map(PlanValue::from).collect())
2260 .collect(),
2261 )
2262 }
2263 SetExpr::Select(select) => {
2264 if let Some(rows) = extract_constant_select_rows(select.as_ref())? {
2265 InsertSource::Rows(rows)
2266 } else if let Some(range_rows) = extract_rows_from_range(select.as_ref())? {
2267 InsertSource::Rows(range_rows.into_rows())
2268 } else {
2269 let select_plan = self.build_select_plan((**source_expr).clone())?;
2270 InsertSource::Select {
2271 plan: Box::new(select_plan),
2272 }
2273 }
2274 }
2275 _ => {
2276 return Err(Error::InvalidArgumentError(
2277 "unsupported INSERT source".into(),
2278 ));
2279 }
2280 };
2281
2282 let plan = InsertPlan {
2283 table: display_name.clone(),
2284 columns,
2285 source: insert_source,
2286 };
2287 tracing::trace!(
2288 "DEBUG SQL handle_insert: about to execute insert for table={}",
2289 display_name
2290 );
2291 self.execute_plan_statement(PlanStatement::Insert(plan))
2292 }
2293
2294 fn handle_update(
2295 &self,
2296 table: TableWithJoins,
2297 assignments: Vec<Assignment>,
2298 from: Option<UpdateTableFromKind>,
2299 selection: Option<SqlExpr>,
2300 returning: Option<Vec<SelectItem>>,
2301 ) -> SqlResult<RuntimeStatementResult<P>> {
2302 if from.is_some() {
2303 return Err(Error::InvalidArgumentError(
2304 "UPDATE ... FROM is not supported yet".into(),
2305 ));
2306 }
2307 if returning.is_some() {
2308 return Err(Error::InvalidArgumentError(
2309 "UPDATE ... RETURNING is not supported".into(),
2310 ));
2311 }
2312 if assignments.is_empty() {
2313 return Err(Error::InvalidArgumentError(
2314 "UPDATE requires at least one assignment".into(),
2315 ));
2316 }
2317
2318 let (display_name, canonical_name) = extract_single_table(std::slice::from_ref(&table))?;
2319
2320 if !self.engine.session().has_active_transaction()
2321 && self
2322 .engine
2323 .context()
2324 .is_table_marked_dropped(&canonical_name)
2325 {
2326 return Err(Error::TransactionContextError(
2327 DROPPED_TABLE_TRANSACTION_ERR.into(),
2328 ));
2329 }
2330
2331 let catalog = self.engine.context().table_catalog();
2332 let resolver = catalog.identifier_resolver();
2333 let table_id = catalog.table_id(&canonical_name);
2334
2335 let mut column_assignments = Vec::with_capacity(assignments.len());
2336 let mut seen: HashMap<String, ()> = HashMap::new();
2337 for assignment in assignments {
2338 let column_name = resolve_assignment_column_name(&assignment.target)?;
2339 let normalized = column_name.to_ascii_lowercase();
2340 if seen.insert(normalized, ()).is_some() {
2341 return Err(Error::InvalidArgumentError(format!(
2342 "duplicate column '{}' in UPDATE assignments",
2343 column_name
2344 )));
2345 }
2346 let value = match SqlValue::try_from_expr(&assignment.value) {
2347 Ok(literal) => AssignmentValue::Literal(PlanValue::from(literal)),
2348 Err(Error::InvalidArgumentError(msg))
2349 if msg.contains("unsupported literal expression") =>
2350 {
2351 let translated = translate_scalar_with_context(
2352 &resolver,
2353 IdentifierContext::new(table_id),
2354 &assignment.value,
2355 )?;
2356 AssignmentValue::Expression(translated)
2357 }
2358 Err(err) => return Err(err),
2359 };
2360 column_assignments.push(ColumnAssignment {
2361 column: column_name,
2362 value,
2363 });
2364 }
2365
2366 let filter = match selection {
2367 Some(expr) => {
2368 let materialized_expr = self.materialize_in_subquery(expr)?;
2369 Some(translate_condition_with_context(
2370 &resolver,
2371 IdentifierContext::new(table_id),
2372 &materialized_expr,
2373 )?)
2374 }
2375 None => None,
2376 };
2377
2378 let plan = UpdatePlan {
2379 table: display_name.clone(),
2380 assignments: column_assignments,
2381 filter,
2382 };
2383 self.execute_plan_statement(PlanStatement::Update(plan))
2384 }
2385
2386 #[allow(clippy::collapsible_if)]
2387 fn handle_delete(&self, delete: Delete) -> SqlResult<RuntimeStatementResult<P>> {
2388 let Delete {
2389 tables,
2390 from,
2391 using,
2392 selection,
2393 returning,
2394 order_by,
2395 limit,
2396 } = delete;
2397
2398 if !tables.is_empty() {
2399 return Err(Error::InvalidArgumentError(
2400 "multi-table DELETE is not supported yet".into(),
2401 ));
2402 }
2403 if let Some(using_tables) = using {
2404 if !using_tables.is_empty() {
2405 return Err(Error::InvalidArgumentError(
2406 "DELETE ... USING is not supported yet".into(),
2407 ));
2408 }
2409 }
2410 if returning.is_some() {
2411 return Err(Error::InvalidArgumentError(
2412 "DELETE ... RETURNING is not supported".into(),
2413 ));
2414 }
2415 if !order_by.is_empty() {
2416 return Err(Error::InvalidArgumentError(
2417 "DELETE ... ORDER BY is not supported yet".into(),
2418 ));
2419 }
2420 if limit.is_some() {
2421 return Err(Error::InvalidArgumentError(
2422 "DELETE ... LIMIT is not supported yet".into(),
2423 ));
2424 }
2425
2426 let from_tables = match from {
2427 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
2428 };
2429 let (display_name, canonical_name) = extract_single_table(&from_tables)?;
2430
2431 if !self.engine.session().has_active_transaction()
2432 && self
2433 .engine
2434 .context()
2435 .is_table_marked_dropped(&canonical_name)
2436 {
2437 return Err(Error::TransactionContextError(
2438 DROPPED_TABLE_TRANSACTION_ERR.into(),
2439 ));
2440 }
2441
2442 let catalog = self.engine.context().table_catalog();
2443 let resolver = catalog.identifier_resolver();
2444 let table_id = catalog.table_id(&canonical_name);
2445
2446 let filter = selection
2447 .map(|expr| {
2448 let materialized_expr = self.materialize_in_subquery(expr)?;
2449 translate_condition_with_context(
2450 &resolver,
2451 IdentifierContext::new(table_id),
2452 &materialized_expr,
2453 )
2454 })
2455 .transpose()?;
2456
2457 let plan = DeletePlan {
2458 table: display_name.clone(),
2459 filter,
2460 };
2461 self.execute_plan_statement(PlanStatement::Delete(plan))
2462 }
2463
2464 fn handle_truncate(
2465 &self,
2466 table_names: &[sqlparser::ast::TruncateTableTarget],
2467 partitions: &Option<Vec<SqlExpr>>,
2468 _table: bool, identity: &Option<sqlparser::ast::TruncateIdentityOption>,
2470 cascade: Option<sqlparser::ast::CascadeOption>,
2471 on_cluster: &Option<Ident>,
2472 ) -> SqlResult<RuntimeStatementResult<P>> {
2473 if table_names.len() > 1 {
2475 return Err(Error::InvalidArgumentError(
2476 "TRUNCATE with multiple tables is not supported yet".into(),
2477 ));
2478 }
2479 if partitions.is_some() {
2480 return Err(Error::InvalidArgumentError(
2481 "TRUNCATE ... PARTITION is not supported".into(),
2482 ));
2483 }
2484 if identity.is_some() {
2485 return Err(Error::InvalidArgumentError(
2486 "TRUNCATE ... RESTART/CONTINUE IDENTITY is not supported".into(),
2487 ));
2488 }
2489 use sqlparser::ast::CascadeOption;
2490 if matches!(cascade, Some(CascadeOption::Cascade)) {
2491 return Err(Error::InvalidArgumentError(
2492 "TRUNCATE ... CASCADE is not supported".into(),
2493 ));
2494 }
2495 if on_cluster.is_some() {
2496 return Err(Error::InvalidArgumentError(
2497 "TRUNCATE ... ON CLUSTER is not supported".into(),
2498 ));
2499 }
2500
2501 let table_name = if let Some(target) = table_names.first() {
2503 let table_obj = &target.name;
2505 let display_name = table_obj.to_string();
2506 let canonical_name = display_name.to_ascii_lowercase();
2507
2508 if !self.engine.session().has_active_transaction()
2510 && self
2511 .engine
2512 .context()
2513 .is_table_marked_dropped(&canonical_name)
2514 {
2515 return Err(Error::TransactionContextError(
2516 DROPPED_TABLE_TRANSACTION_ERR.into(),
2517 ));
2518 }
2519
2520 display_name
2521 } else {
2522 return Err(Error::InvalidArgumentError(
2523 "TRUNCATE requires a table name".into(),
2524 ));
2525 };
2526
2527 let plan = TruncatePlan {
2528 table: table_name.clone(),
2529 };
2530 self.execute_plan_statement(PlanStatement::Truncate(plan))
2531 }
2532
2533 #[allow(clippy::too_many_arguments)] fn handle_drop(
2535 &self,
2536 object_type: ObjectType,
2537 if_exists: bool,
2538 names: Vec<ObjectName>,
2539 cascade: bool,
2540 restrict: bool,
2541 purge: bool,
2542 temporary: bool,
2543 ) -> SqlResult<RuntimeStatementResult<P>> {
2544 if purge || temporary {
2545 return Err(Error::InvalidArgumentError(
2546 "DROP purge/temporary options are not supported".into(),
2547 ));
2548 }
2549
2550 match object_type {
2551 ObjectType::Table => {
2552 if cascade || restrict {
2553 return Err(Error::InvalidArgumentError(
2554 "DROP TABLE CASCADE/RESTRICT is not supported".into(),
2555 ));
2556 }
2557
2558 for name in names {
2559 let table_name = Self::object_name_to_string(&name)?;
2560 let mut plan = llkv_plan::DropTablePlan::new(table_name.clone());
2561 plan.if_exists = if_exists;
2562
2563 self.execute_plan_statement(llkv_plan::PlanStatement::DropTable(plan))
2564 .map_err(|err| Self::map_table_error(&table_name, err))?;
2565 }
2566
2567 Ok(RuntimeStatementResult::NoOp)
2568 }
2569 ObjectType::Index => {
2570 if cascade || restrict {
2571 return Err(Error::InvalidArgumentError(
2572 "DROP INDEX CASCADE/RESTRICT is not supported".into(),
2573 ));
2574 }
2575
2576 for name in names {
2577 let index_name = Self::object_name_to_string(&name)?;
2578 let plan = llkv_plan::DropIndexPlan::new(index_name).if_exists(if_exists);
2579 self.execute_plan_statement(llkv_plan::PlanStatement::DropIndex(plan))?;
2580 }
2581
2582 Ok(RuntimeStatementResult::NoOp)
2583 }
2584 ObjectType::Schema => {
2585 if restrict {
2586 return Err(Error::InvalidArgumentError(
2587 "DROP SCHEMA RESTRICT is not supported".into(),
2588 ));
2589 }
2590
2591 let catalog = self.engine.context().table_catalog();
2592
2593 for name in names {
2594 let (display_name, canonical_name) = canonical_object_name(&name)?;
2595
2596 if !catalog.schema_exists(&canonical_name) {
2597 if if_exists {
2598 continue;
2599 }
2600 return Err(Error::CatalogError(format!(
2601 "Schema '{}' does not exist",
2602 display_name
2603 )));
2604 }
2605
2606 if cascade {
2607 let all_tables = catalog.table_names();
2609 let schema_prefix = format!("{}.", canonical_name);
2610
2611 for table in all_tables {
2612 if table.to_ascii_lowercase().starts_with(&schema_prefix) {
2613 let mut plan = llkv_plan::DropTablePlan::new(table.clone());
2614 plan.if_exists = false;
2615 self.execute_plan_statement(llkv_plan::PlanStatement::DropTable(
2616 plan,
2617 ))?;
2618 }
2619 }
2620 } else {
2621 let all_tables = catalog.table_names();
2623 let schema_prefix = format!("{}.", canonical_name);
2624 let has_tables = all_tables
2625 .iter()
2626 .any(|t| t.to_ascii_lowercase().starts_with(&schema_prefix));
2627
2628 if has_tables {
2629 return Err(Error::CatalogError(format!(
2630 "Schema '{}' is not empty. Use CASCADE to drop schema and all its tables",
2631 display_name
2632 )));
2633 }
2634 }
2635
2636 if !catalog.unregister_schema(&canonical_name) && !if_exists {
2638 return Err(Error::CatalogError(format!(
2639 "Schema '{}' does not exist",
2640 display_name
2641 )));
2642 }
2643 }
2644
2645 Ok(RuntimeStatementResult::NoOp)
2646 }
2647 _ => Err(Error::InvalidArgumentError(format!(
2648 "DROP {} is not supported",
2649 object_type
2650 ))),
2651 }
2652 }
2653
2654 fn handle_alter_table(
2655 &self,
2656 name: ObjectName,
2657 if_exists: bool,
2658 only: bool,
2659 operations: Vec<AlterTableOperation>,
2660 ) -> SqlResult<RuntimeStatementResult<P>> {
2661 if only {
2662 return Err(Error::InvalidArgumentError(
2663 "ALTER TABLE ONLY is not supported yet".into(),
2664 ));
2665 }
2666
2667 if operations.is_empty() {
2668 return Ok(RuntimeStatementResult::NoOp);
2669 }
2670
2671 if operations.len() != 1 {
2672 return Err(Error::InvalidArgumentError(
2673 "ALTER TABLE currently supports exactly one operation".into(),
2674 ));
2675 }
2676
2677 let operation = operations.into_iter().next().expect("checked length");
2678 match operation {
2679 AlterTableOperation::RenameTable { table_name } => {
2680 let new_name = table_name.to_string();
2681 self.handle_alter_table_rename(name, new_name, if_exists)
2682 }
2683 AlterTableOperation::RenameColumn {
2684 old_column_name,
2685 new_column_name,
2686 } => {
2687 let plan = llkv_plan::AlterTablePlan {
2688 table_name: name.to_string(),
2689 if_exists,
2690 operation: llkv_plan::AlterTableOperation::RenameColumn {
2691 old_column_name: old_column_name.to_string(),
2692 new_column_name: new_column_name.to_string(),
2693 },
2694 };
2695 self.execute_plan_statement(PlanStatement::AlterTable(plan))
2696 }
2697 AlterTableOperation::AlterColumn { column_name, op } => {
2698 if let AlterColumnOperation::SetDataType {
2700 data_type,
2701 using,
2702 had_set: _,
2703 } = op
2704 {
2705 if using.is_some() {
2706 return Err(Error::InvalidArgumentError(
2707 "ALTER COLUMN SET DATA TYPE USING clause is not yet supported".into(),
2708 ));
2709 }
2710
2711 let plan = llkv_plan::AlterTablePlan {
2712 table_name: name.to_string(),
2713 if_exists,
2714 operation: llkv_plan::AlterTableOperation::SetColumnDataType {
2715 column_name: column_name.to_string(),
2716 new_data_type: data_type.to_string(),
2717 },
2718 };
2719 self.execute_plan_statement(PlanStatement::AlterTable(plan))
2720 } else {
2721 Err(Error::InvalidArgumentError(format!(
2722 "unsupported ALTER COLUMN operation: {:?}",
2723 op
2724 )))
2725 }
2726 }
2727 AlterTableOperation::DropColumn {
2728 has_column_keyword: _,
2729 column_names,
2730 if_exists: column_if_exists,
2731 drop_behavior,
2732 } => {
2733 if column_names.len() != 1 {
2734 return Err(Error::InvalidArgumentError(
2735 "DROP COLUMN currently supports dropping one column at a time".into(),
2736 ));
2737 }
2738
2739 let column_name = column_names.into_iter().next().unwrap().to_string();
2740 let cascade = matches!(drop_behavior, Some(sqlparser::ast::DropBehavior::Cascade));
2741
2742 let plan = llkv_plan::AlterTablePlan {
2743 table_name: name.to_string(),
2744 if_exists,
2745 operation: llkv_plan::AlterTableOperation::DropColumn {
2746 column_name,
2747 if_exists: column_if_exists,
2748 cascade,
2749 },
2750 };
2751 self.execute_plan_statement(PlanStatement::AlterTable(plan))
2752 }
2753 other => Err(Error::InvalidArgumentError(format!(
2754 "unsupported ALTER TABLE operation: {:?}",
2755 other
2756 ))),
2757 }
2758 }
2759
2760 fn handle_alter_table_rename(
2761 &self,
2762 original_name: ObjectName,
2763 new_table_name: String,
2764 if_exists: bool,
2765 ) -> SqlResult<RuntimeStatementResult<P>> {
2766 let (schema_opt, table_name) = parse_schema_qualified_name(&original_name)?;
2767
2768 let new_table_name_clean = new_table_name.trim();
2769
2770 if new_table_name_clean.is_empty() {
2771 return Err(Error::InvalidArgumentError(
2772 "ALTER TABLE RENAME requires a non-empty table name".into(),
2773 ));
2774 }
2775
2776 let (raw_new_schema_opt, raw_new_table) =
2777 if let Some((schema_part, table_part)) = new_table_name_clean.split_once('.') {
2778 (
2779 Some(schema_part.trim().to_string()),
2780 table_part.trim().to_string(),
2781 )
2782 } else {
2783 (None, new_table_name_clean.to_string())
2784 };
2785
2786 if schema_opt.is_none() && raw_new_schema_opt.is_some() {
2787 return Err(Error::InvalidArgumentError(
2788 "ALTER TABLE RENAME cannot add a schema qualifier".into(),
2789 ));
2790 }
2791
2792 let new_table_trimmed = raw_new_table.trim_matches('"');
2793 if new_table_trimmed.is_empty() {
2794 return Err(Error::InvalidArgumentError(
2795 "ALTER TABLE RENAME requires a non-empty table name".into(),
2796 ));
2797 }
2798
2799 if let (Some(existing_schema), Some(new_schema_raw)) =
2800 (schema_opt.as_ref(), raw_new_schema_opt.as_ref())
2801 {
2802 let new_schema_trimmed = new_schema_raw.trim_matches('"');
2803 if !existing_schema.eq_ignore_ascii_case(new_schema_trimmed) {
2804 return Err(Error::InvalidArgumentError(
2805 "ALTER TABLE RENAME cannot change table schema".into(),
2806 ));
2807 }
2808 }
2809
2810 let new_table_display = raw_new_table;
2811 let new_schema_opt = raw_new_schema_opt;
2812
2813 fn join_schema_table(schema: &str, table: &str) -> String {
2814 let mut qualified = String::with_capacity(schema.len() + table.len() + 1);
2815 qualified.push_str(schema);
2816 qualified.push('.');
2817 qualified.push_str(table);
2818 qualified
2819 }
2820
2821 let current_display = schema_opt
2822 .as_ref()
2823 .map(|schema| join_schema_table(schema, &table_name))
2824 .unwrap_or_else(|| table_name.clone());
2825
2826 let new_display = if let Some(new_schema_raw) = new_schema_opt.clone() {
2827 join_schema_table(&new_schema_raw, &new_table_display)
2828 } else if let Some(schema) = schema_opt.as_ref() {
2829 join_schema_table(schema, &new_table_display)
2830 } else {
2831 new_table_display.clone()
2832 };
2833
2834 let plan = RenameTablePlan::new(¤t_display, &new_display).if_exists(if_exists);
2835
2836 match CatalogDdl::rename_table(self.engine.session(), plan) {
2837 Ok(()) => Ok(RuntimeStatementResult::NoOp),
2838 Err(err) => Err(Self::map_table_error(¤t_display, err)),
2839 }
2840 }
2841
2842 fn materialize_in_subquery(&self, root_expr: SqlExpr) -> SqlResult<SqlExpr> {
2850 enum WorkItem {
2852 Process(Box<SqlExpr>),
2853 BuildBinaryOp {
2854 op: BinaryOperator,
2855 left: Box<SqlExpr>,
2856 right_done: bool,
2857 },
2858 BuildUnaryOp {
2859 op: UnaryOperator,
2860 },
2861 BuildNested,
2862 BuildIsNull,
2863 BuildIsNotNull,
2864 FinishBetween {
2865 negated: bool,
2866 },
2867 }
2868
2869 let mut work_stack: Vec<WorkItem> = vec![WorkItem::Process(Box::new(root_expr))];
2870 let mut result_stack: Vec<SqlExpr> = Vec::new();
2871
2872 while let Some(item) = work_stack.pop() {
2873 match item {
2874 WorkItem::Process(expr) => {
2875 match *expr {
2876 SqlExpr::InSubquery {
2877 expr: left_expr,
2878 subquery,
2879 negated,
2880 } => {
2881 let result = self.handle_query(*subquery)?;
2883
2884 let values = match result {
2886 RuntimeStatementResult::Select { execution, .. } => {
2887 let batches = execution.collect()?;
2888 let mut collected_values = Vec::new();
2889
2890 for batch in batches {
2891 if batch.num_columns() == 0 {
2892 continue;
2893 }
2894 let column = batch.column(0);
2895
2896 for row_idx in 0..column.len() {
2897 use arrow::datatypes::DataType;
2898 let value = if column.is_null(row_idx) {
2899 Value::Null
2900 } else {
2901 match column.data_type() {
2902 DataType::Int64 => {
2903 let arr = column
2904 .as_any()
2905 .downcast_ref::<arrow::array::Int64Array>()
2906 .unwrap();
2907 Value::Number(
2908 arr.value(row_idx).to_string(),
2909 false,
2910 )
2911 }
2912 DataType::Float64 => {
2913 let arr = column
2914 .as_any()
2915 .downcast_ref::<arrow::array::Float64Array>()
2916 .unwrap();
2917 Value::Number(
2918 arr.value(row_idx).to_string(),
2919 false,
2920 )
2921 }
2922 DataType::Utf8 => {
2923 let arr = column
2924 .as_any()
2925 .downcast_ref::<arrow::array::StringArray>()
2926 .unwrap();
2927 Value::SingleQuotedString(
2928 arr.value(row_idx).to_string(),
2929 )
2930 }
2931 DataType::Boolean => {
2932 let arr = column
2933 .as_any()
2934 .downcast_ref::<arrow::array::BooleanArray>()
2935 .unwrap();
2936 Value::Boolean(arr.value(row_idx))
2937 }
2938 other => {
2939 return Err(Error::InvalidArgumentError(
2940 format!(
2941 "unsupported data type in IN subquery: {other:?}"
2942 ),
2943 ));
2944 }
2945 }
2946 };
2947 collected_values.push(ValueWithSpan {
2948 value,
2949 span: Span::empty(),
2950 });
2951 }
2952 }
2953
2954 collected_values
2955 }
2956 _ => {
2957 return Err(Error::InvalidArgumentError(
2958 "IN subquery must be a SELECT statement".into(),
2959 ));
2960 }
2961 };
2962
2963 result_stack.push(SqlExpr::InList {
2965 expr: left_expr,
2966 list: values.into_iter().map(SqlExpr::Value).collect(),
2967 negated,
2968 });
2969 }
2970 SqlExpr::BinaryOp { left, op, right } => {
2971 work_stack.push(WorkItem::BuildBinaryOp {
2976 op,
2977 left: left.clone(),
2978 right_done: false,
2979 });
2980 work_stack.push(WorkItem::Process(left));
2984 work_stack.push(WorkItem::Process(right));
2985 }
2986 SqlExpr::UnaryOp { op, expr } => {
2987 work_stack.push(WorkItem::BuildUnaryOp { op });
2988 work_stack.push(WorkItem::Process(expr));
2989 }
2990 SqlExpr::Nested(inner) => {
2991 work_stack.push(WorkItem::BuildNested);
2992 work_stack.push(WorkItem::Process(inner));
2993 }
2994 SqlExpr::IsNull(inner) => {
2995 work_stack.push(WorkItem::BuildIsNull);
2996 work_stack.push(WorkItem::Process(inner));
2997 }
2998 SqlExpr::IsNotNull(inner) => {
2999 work_stack.push(WorkItem::BuildIsNotNull);
3000 work_stack.push(WorkItem::Process(inner));
3001 }
3002 SqlExpr::Between {
3003 expr,
3004 negated,
3005 low,
3006 high,
3007 } => {
3008 work_stack.push(WorkItem::FinishBetween { negated });
3009 work_stack.push(WorkItem::Process(high));
3010 work_stack.push(WorkItem::Process(low));
3011 work_stack.push(WorkItem::Process(expr));
3012 }
3013 other => {
3015 result_stack.push(other);
3016 }
3017 }
3018 }
3019 WorkItem::BuildBinaryOp {
3020 op,
3021 left,
3022 right_done,
3023 } => {
3024 if !right_done {
3025 let left_result = result_stack.pop().unwrap();
3027 work_stack.push(WorkItem::BuildBinaryOp {
3028 op,
3029 left: Box::new(left_result),
3030 right_done: true,
3031 });
3032 } else {
3033 let right_result = result_stack.pop().unwrap();
3035 let left_result = *left;
3036 result_stack.push(SqlExpr::BinaryOp {
3037 left: Box::new(left_result),
3038 op,
3039 right: Box::new(right_result),
3040 });
3041 }
3042 }
3043 WorkItem::BuildUnaryOp { op } => {
3044 let inner = result_stack.pop().unwrap();
3045 result_stack.push(SqlExpr::UnaryOp {
3046 op,
3047 expr: Box::new(inner),
3048 });
3049 }
3050 WorkItem::BuildNested => {
3051 let inner = result_stack.pop().unwrap();
3052 result_stack.push(SqlExpr::Nested(Box::new(inner)));
3053 }
3054 WorkItem::BuildIsNull => {
3055 let inner = result_stack.pop().unwrap();
3056 result_stack.push(SqlExpr::IsNull(Box::new(inner)));
3057 }
3058 WorkItem::BuildIsNotNull => {
3059 let inner = result_stack.pop().unwrap();
3060 result_stack.push(SqlExpr::IsNotNull(Box::new(inner)));
3061 }
3062 WorkItem::FinishBetween { negated } => {
3063 let high_result = result_stack.pop().unwrap();
3064 let low_result = result_stack.pop().unwrap();
3065 let expr_result = result_stack.pop().unwrap();
3066 result_stack.push(SqlExpr::Between {
3067 expr: Box::new(expr_result),
3068 negated,
3069 low: Box::new(low_result),
3070 high: Box::new(high_result),
3071 });
3072 }
3073 }
3074 }
3075
3076 Ok(result_stack
3078 .pop()
3079 .expect("result stack should have exactly one item"))
3080 }
3081
3082 fn handle_query(&self, query: Query) -> SqlResult<RuntimeStatementResult<P>> {
3083 if let Some(result) = self.try_handle_pragma_table_info(&query)? {
3085 return Ok(result);
3086 }
3087
3088 let select_plan = self.build_select_plan(query)?;
3089 self.execute_plan_statement(PlanStatement::Select(select_plan))
3090 }
3091
3092 fn build_select_plan(&self, query: Query) -> SqlResult<SelectPlan> {
3093 if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
3094 return Err(Error::TransactionContextError(
3095 "TransactionContext Error: transaction is aborted".into(),
3096 ));
3097 }
3098
3099 validate_simple_query(&query)?;
3100 let catalog = self.engine.context().table_catalog();
3101 let resolver = catalog.identifier_resolver();
3102
3103 let (mut select_plan, select_context) = match query.body.as_ref() {
3104 SetExpr::Select(select) => self.translate_select(select.as_ref(), &resolver)?,
3105 other => {
3106 return Err(Error::InvalidArgumentError(format!(
3107 "unsupported query expression: {other:?}"
3108 )));
3109 }
3110 };
3111 if let Some(order_by) = &query.order_by {
3112 if !select_plan.aggregates.is_empty() {
3113 return Err(Error::InvalidArgumentError(
3114 "ORDER BY is not supported for aggregate queries".into(),
3115 ));
3116 }
3117 let order_plan = self.translate_order_by(&resolver, select_context, order_by)?;
3118 select_plan = select_plan.with_order_by(order_plan);
3119 }
3120 Ok(select_plan)
3121 }
3122
3123 fn translate_select(
3124 &self,
3125 select: &Select,
3126 resolver: &IdentifierResolver<'_>,
3127 ) -> SqlResult<(SelectPlan, IdentifierContext)> {
3128 let distinct = match &select.distinct {
3129 None => false,
3130 Some(Distinct::Distinct) => true,
3131 Some(Distinct::On(_)) => {
3132 return Err(Error::InvalidArgumentError(
3133 "SELECT DISTINCT ON is not supported".into(),
3134 ));
3135 }
3136 };
3137 if select.top.is_some() {
3138 return Err(Error::InvalidArgumentError(
3139 "SELECT TOP is not supported".into(),
3140 ));
3141 }
3142 if select.exclude.is_some() {
3143 return Err(Error::InvalidArgumentError(
3144 "SELECT EXCLUDE is not supported".into(),
3145 ));
3146 }
3147 if select.into.is_some() {
3148 return Err(Error::InvalidArgumentError(
3149 "SELECT INTO is not supported".into(),
3150 ));
3151 }
3152 if !select.lateral_views.is_empty() {
3153 return Err(Error::InvalidArgumentError(
3154 "LATERAL VIEW is not supported".into(),
3155 ));
3156 }
3157 if select.prewhere.is_some() {
3158 return Err(Error::InvalidArgumentError(
3159 "PREWHERE is not supported".into(),
3160 ));
3161 }
3162 if !group_by_is_empty(&select.group_by) || select.value_table_mode.is_some() {
3163 return Err(Error::InvalidArgumentError(
3164 "GROUP BY and SELECT AS VALUE/STRUCT are not supported".into(),
3165 ));
3166 }
3167 if !select.cluster_by.is_empty()
3168 || !select.distribute_by.is_empty()
3169 || !select.sort_by.is_empty()
3170 {
3171 return Err(Error::InvalidArgumentError(
3172 "CLUSTER/DISTRIBUTE/SORT BY clauses are not supported".into(),
3173 ));
3174 }
3175 if select.having.is_some()
3176 || !select.named_window.is_empty()
3177 || select.qualify.is_some()
3178 || select.connect_by.is_some()
3179 {
3180 return Err(Error::InvalidArgumentError(
3181 "advanced SELECT clauses are not supported".into(),
3182 ));
3183 }
3184
3185 let table_alias = select
3186 .from
3187 .first()
3188 .and_then(|table_with_joins| match &table_with_joins.relation {
3189 TableFactor::Table { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
3190 _ => None,
3191 });
3192
3193 let has_joins = select
3194 .from
3195 .iter()
3196 .any(|table_with_joins| !table_with_joins.joins.is_empty());
3197 let catalog = self.engine.context().table_catalog();
3199 let (mut plan, id_context) = if select.from.is_empty() {
3200 let mut p = SelectPlan::new("");
3202 let projections = self.build_projection_list(
3203 resolver,
3204 IdentifierContext::new(None),
3205 &select.projection,
3206 )?;
3207 p = p.with_projections(projections);
3208 (p, IdentifierContext::new(None))
3209 } else if select.from.len() == 1 && !has_joins {
3210 let (display_name, canonical_name) = extract_single_table(&select.from)?;
3212 let table_id = catalog.table_id(&canonical_name);
3213 let mut p = SelectPlan::new(display_name.clone());
3214 let single_table_context =
3215 IdentifierContext::new(table_id).with_table_alias(table_alias.clone());
3216 if let Some(alias) = table_alias.as_ref() {
3217 validate_projection_alias_qualifiers(&select.projection, alias)?;
3218 }
3219 if let Some(aggregates) = self.detect_simple_aggregates(&select.projection)? {
3220 p = p.with_aggregates(aggregates);
3221 } else {
3222 let projections = self.build_projection_list(
3223 resolver,
3224 single_table_context.clone(),
3225 &select.projection,
3226 )?;
3227 p = p.with_projections(projections);
3228 }
3229 (p, single_table_context)
3230 } else {
3231 let tables = extract_tables(&select.from)?;
3233 let mut p = SelectPlan::with_tables(tables);
3234 let projections = self.build_projection_list(
3237 resolver,
3238 IdentifierContext::new(None),
3239 &select.projection,
3240 )?;
3241 p = p.with_projections(projections);
3242 (p, IdentifierContext::new(None))
3243 };
3244
3245 let filter_expr = match &select.selection {
3246 Some(expr) => {
3247 let materialized_expr = self.materialize_in_subquery(expr.clone())?;
3248 Some(translate_condition_with_context(
3249 resolver,
3250 id_context.clone(),
3251 &materialized_expr,
3252 )?)
3253 }
3254 None => None,
3255 };
3256 plan = plan.with_filter(filter_expr);
3257 plan = plan.with_distinct(distinct);
3258 Ok((plan, id_context))
3259 }
3260
3261 fn translate_order_by(
3262 &self,
3263 resolver: &IdentifierResolver<'_>,
3264 id_context: IdentifierContext,
3265 order_by: &OrderBy,
3266 ) -> SqlResult<Vec<OrderByPlan>> {
3267 let exprs = match &order_by.kind {
3268 OrderByKind::Expressions(exprs) => exprs,
3269 _ => {
3270 return Err(Error::InvalidArgumentError(
3271 "unsupported ORDER BY clause".into(),
3272 ));
3273 }
3274 };
3275
3276 let base_nulls_first = self.default_nulls_first.load(AtomicOrdering::Relaxed);
3277
3278 let mut plans = Vec::with_capacity(exprs.len());
3279 for order_expr in exprs {
3280 let ascending = order_expr.options.asc.unwrap_or(true);
3281 let default_nulls_first_for_direction = if ascending {
3282 base_nulls_first
3283 } else {
3284 !base_nulls_first
3285 };
3286 let nulls_first = order_expr
3287 .options
3288 .nulls_first
3289 .unwrap_or(default_nulls_first_for_direction);
3290
3291 if let SqlExpr::Identifier(ident) = &order_expr.expr
3292 && ident.value.eq_ignore_ascii_case("ALL")
3293 && ident.quote_style.is_none()
3294 {
3295 plans.push(OrderByPlan {
3296 target: OrderTarget::All,
3297 sort_type: OrderSortType::Native,
3298 ascending,
3299 nulls_first,
3300 });
3301 continue;
3302 }
3303
3304 let (target, sort_type) = match &order_expr.expr {
3305 SqlExpr::Identifier(_) | SqlExpr::CompoundIdentifier(_) => (
3306 OrderTarget::Column(Self::resolve_simple_column_expr(
3307 resolver,
3308 id_context.clone(),
3309 &order_expr.expr,
3310 )?),
3311 OrderSortType::Native,
3312 ),
3313 SqlExpr::Cast {
3314 expr,
3315 data_type:
3316 SqlDataType::Int(_)
3317 | SqlDataType::Integer(_)
3318 | SqlDataType::BigInt(_)
3319 | SqlDataType::SmallInt(_)
3320 | SqlDataType::TinyInt(_),
3321 ..
3322 } => (
3323 OrderTarget::Column(Self::resolve_simple_column_expr(
3324 resolver,
3325 id_context.clone(),
3326 expr,
3327 )?),
3328 OrderSortType::CastTextToInteger,
3329 ),
3330 SqlExpr::Cast { data_type, .. } => {
3331 return Err(Error::InvalidArgumentError(format!(
3332 "ORDER BY CAST target type {:?} is not supported",
3333 data_type
3334 )));
3335 }
3336 SqlExpr::Value(value_with_span) => match &value_with_span.value {
3337 Value::Number(raw, _) => {
3338 let position: usize = raw.parse().map_err(|_| {
3339 Error::InvalidArgumentError(format!(
3340 "ORDER BY position '{}' is not a valid positive integer",
3341 raw
3342 ))
3343 })?;
3344 if position == 0 {
3345 return Err(Error::InvalidArgumentError(
3346 "ORDER BY position must be at least 1".into(),
3347 ));
3348 }
3349 (OrderTarget::Index(position - 1), OrderSortType::Native)
3350 }
3351 other => {
3352 return Err(Error::InvalidArgumentError(format!(
3353 "unsupported ORDER BY literal expression: {other:?}"
3354 )));
3355 }
3356 },
3357 other => {
3358 return Err(Error::InvalidArgumentError(format!(
3359 "unsupported ORDER BY expression: {other:?}"
3360 )));
3361 }
3362 };
3363
3364 plans.push(OrderByPlan {
3365 target,
3366 sort_type,
3367 ascending,
3368 nulls_first,
3369 });
3370 }
3371
3372 Ok(plans)
3373 }
3374
3375 fn resolve_simple_column_expr(
3376 resolver: &IdentifierResolver<'_>,
3377 context: IdentifierContext,
3378 expr: &SqlExpr,
3379 ) -> SqlResult<String> {
3380 let scalar = translate_scalar_with_context(resolver, context, expr)?;
3381 match scalar {
3382 llkv_expr::expr::ScalarExpr::Column(column) => Ok(column),
3383 other => Err(Error::InvalidArgumentError(format!(
3384 "ORDER BY expression must reference a simple column, found {other:?}"
3385 ))),
3386 }
3387 }
3388
3389 fn detect_simple_aggregates(
3390 &self,
3391 projection_items: &[SelectItem],
3392 ) -> SqlResult<Option<Vec<AggregateExpr>>> {
3393 if projection_items.is_empty() {
3394 return Ok(None);
3395 }
3396
3397 let mut specs: Vec<AggregateExpr> = Vec::with_capacity(projection_items.len());
3398 for (idx, item) in projection_items.iter().enumerate() {
3399 let (expr, alias_opt) = match item {
3400 SelectItem::UnnamedExpr(expr) => (expr, None),
3401 SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())),
3402 _ => return Ok(None),
3403 };
3404
3405 let alias = alias_opt.unwrap_or_else(|| format!("col{}", idx + 1));
3406 let SqlExpr::Function(func) = expr else {
3407 return Ok(None);
3408 };
3409
3410 if func.uses_odbc_syntax {
3411 return Err(Error::InvalidArgumentError(
3412 "ODBC function syntax is not supported in aggregate queries".into(),
3413 ));
3414 }
3415 if !matches!(func.parameters, FunctionArguments::None) {
3416 return Err(Error::InvalidArgumentError(
3417 "parameterized aggregate functions are not supported".into(),
3418 ));
3419 }
3420 if func.filter.is_some()
3421 || func.null_treatment.is_some()
3422 || func.over.is_some()
3423 || !func.within_group.is_empty()
3424 {
3425 return Err(Error::InvalidArgumentError(
3426 "advanced aggregate clauses are not supported".into(),
3427 ));
3428 }
3429
3430 let mut is_distinct = false;
3431 let args_slice: &[FunctionArg] = match &func.args {
3432 FunctionArguments::List(list) => {
3433 if let Some(dup) = &list.duplicate_treatment {
3434 use sqlparser::ast::DuplicateTreatment;
3435 match dup {
3436 DuplicateTreatment::All => {}
3437 DuplicateTreatment::Distinct => is_distinct = true,
3438 }
3439 }
3440 if !list.clauses.is_empty() {
3441 return Err(Error::InvalidArgumentError(
3442 "aggregate argument clauses are not supported".into(),
3443 ));
3444 }
3445 &list.args
3446 }
3447 FunctionArguments::None => &[],
3448 FunctionArguments::Subquery(_) => {
3449 return Err(Error::InvalidArgumentError(
3450 "aggregate subquery arguments are not supported".into(),
3451 ));
3452 }
3453 };
3454
3455 let func_name = if func.name.0.len() == 1 {
3456 match &func.name.0[0] {
3457 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
3458 _ => {
3459 return Err(Error::InvalidArgumentError(
3460 "unsupported aggregate function name".into(),
3461 ));
3462 }
3463 }
3464 } else {
3465 return Err(Error::InvalidArgumentError(
3466 "qualified aggregate function names are not supported".into(),
3467 ));
3468 };
3469
3470 let aggregate = match func_name.as_str() {
3471 "count" => {
3472 if args_slice.len() != 1 {
3473 return Err(Error::InvalidArgumentError(
3474 "COUNT accepts exactly one argument".into(),
3475 ));
3476 }
3477 match &args_slice[0] {
3478 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
3479 if is_distinct {
3480 return Err(Error::InvalidArgumentError(
3481 "COUNT(DISTINCT *) is not supported".into(),
3482 ));
3483 }
3484 AggregateExpr::count_star(alias)
3485 }
3486 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
3487 let column = resolve_column_name(arg_expr)?;
3488 if is_distinct {
3489 AggregateExpr::count_distinct_column(column, alias)
3490 } else {
3491 AggregateExpr::count_column(column, alias)
3492 }
3493 }
3494 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
3495 return Err(Error::InvalidArgumentError(
3496 "named COUNT arguments are not supported".into(),
3497 ));
3498 }
3499 FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
3500 return Err(Error::InvalidArgumentError(
3501 "COUNT does not support qualified wildcards".into(),
3502 ));
3503 }
3504 }
3505 }
3506 "sum" | "min" | "max" => {
3507 if is_distinct {
3508 return Err(Error::InvalidArgumentError(
3509 "DISTINCT is not supported for this aggregate".into(),
3510 ));
3511 }
3512 if args_slice.len() != 1 {
3513 return Err(Error::InvalidArgumentError(format!(
3514 "{} accepts exactly one argument",
3515 func_name.to_uppercase()
3516 )));
3517 }
3518 let arg_expr = match &args_slice[0] {
3519 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => arg_expr,
3520 FunctionArg::Unnamed(FunctionArgExpr::Wildcard)
3521 | FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
3522 return Err(Error::InvalidArgumentError(format!(
3523 "{} does not support wildcard arguments",
3524 func_name.to_uppercase()
3525 )));
3526 }
3527 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
3528 return Err(Error::InvalidArgumentError(format!(
3529 "{} arguments must be column references",
3530 func_name.to_uppercase()
3531 )));
3532 }
3533 };
3534
3535 if func_name == "sum" {
3536 if let Some(column) = parse_count_nulls_case(arg_expr)? {
3537 AggregateExpr::count_nulls(column, alias)
3538 } else {
3539 let column = resolve_column_name(arg_expr)?;
3540 AggregateExpr::sum_int64(column, alias)
3541 }
3542 } else {
3543 let column = resolve_column_name(arg_expr)?;
3544 if func_name == "min" {
3545 AggregateExpr::min_int64(column, alias)
3546 } else {
3547 AggregateExpr::max_int64(column, alias)
3548 }
3549 }
3550 }
3551 _ => return Ok(None),
3552 };
3553
3554 specs.push(aggregate);
3555 }
3556
3557 if specs.is_empty() {
3558 return Ok(None);
3559 }
3560 Ok(Some(specs))
3561 }
3562
3563 fn build_projection_list(
3564 &self,
3565 resolver: &IdentifierResolver<'_>,
3566 id_context: IdentifierContext,
3567 projection_items: &[SelectItem],
3568 ) -> SqlResult<Vec<SelectProjection>> {
3569 if projection_items.is_empty() {
3570 return Err(Error::InvalidArgumentError(
3571 "SELECT projection must include at least one column".into(),
3572 ));
3573 }
3574
3575 let mut projections = Vec::with_capacity(projection_items.len());
3576 for (idx, item) in projection_items.iter().enumerate() {
3577 match item {
3578 SelectItem::Wildcard(options) => {
3579 if let Some(exclude) = &options.opt_exclude {
3580 use sqlparser::ast::ExcludeSelectItem;
3581 let exclude_cols = match exclude {
3582 ExcludeSelectItem::Single(ident) => vec![ident.value.clone()],
3583 ExcludeSelectItem::Multiple(idents) => {
3584 idents.iter().map(|id| id.value.clone()).collect()
3585 }
3586 };
3587 projections.push(SelectProjection::AllColumnsExcept {
3588 exclude: exclude_cols,
3589 });
3590 } else {
3591 projections.push(SelectProjection::AllColumns);
3592 }
3593 }
3594 SelectItem::QualifiedWildcard(kind, _) => match kind {
3595 SelectItemQualifiedWildcardKind::ObjectName(name) => {
3596 projections.push(SelectProjection::Column {
3597 name: name.to_string(),
3598 alias: None,
3599 });
3600 }
3601 SelectItemQualifiedWildcardKind::Expr(_) => {
3602 return Err(Error::InvalidArgumentError(
3603 "expression-qualified wildcards are not supported".into(),
3604 ));
3605 }
3606 },
3607 SelectItem::UnnamedExpr(expr) => match expr {
3608 SqlExpr::Identifier(ident) => {
3609 let parts = vec![ident.value.clone()];
3610 let resolution = resolver.resolve(&parts, id_context.clone())?;
3611 if resolution.is_simple() {
3612 projections.push(SelectProjection::Column {
3613 name: resolution.column().to_string(),
3614 alias: None,
3615 });
3616 } else {
3617 let alias = format!("col{}", idx + 1);
3618 projections.push(SelectProjection::Computed {
3619 expr: resolution.into_scalar_expr(),
3620 alias,
3621 });
3622 }
3623 }
3624 SqlExpr::CompoundIdentifier(parts) => {
3625 let name_parts: Vec<String> =
3626 parts.iter().map(|part| part.value.clone()).collect();
3627 let resolution = resolver.resolve(&name_parts, id_context.clone())?;
3628 if resolution.is_simple() {
3629 projections.push(SelectProjection::Column {
3630 name: resolution.column().to_string(),
3631 alias: None,
3632 });
3633 } else {
3634 let alias = format!("col{}", idx + 1);
3635 projections.push(SelectProjection::Computed {
3636 expr: resolution.into_scalar_expr(),
3637 alias,
3638 });
3639 }
3640 }
3641 _ => {
3642 let alias = format!("col{}", idx + 1);
3643 let scalar =
3644 translate_scalar_with_context(resolver, id_context.clone(), expr)?;
3645 projections.push(SelectProjection::Computed {
3646 expr: scalar,
3647 alias,
3648 });
3649 }
3650 },
3651 SelectItem::ExprWithAlias { expr, alias } => match expr {
3652 SqlExpr::Identifier(ident) => {
3653 let parts = vec![ident.value.clone()];
3654 let resolution = resolver.resolve(&parts, id_context.clone())?;
3655 if resolution.is_simple() {
3656 projections.push(SelectProjection::Column {
3657 name: resolution.column().to_string(),
3658 alias: Some(alias.value.clone()),
3659 });
3660 } else {
3661 projections.push(SelectProjection::Computed {
3662 expr: resolution.into_scalar_expr(),
3663 alias: alias.value.clone(),
3664 });
3665 }
3666 }
3667 SqlExpr::CompoundIdentifier(parts) => {
3668 let name_parts: Vec<String> =
3669 parts.iter().map(|part| part.value.clone()).collect();
3670 let resolution = resolver.resolve(&name_parts, id_context.clone())?;
3671 if resolution.is_simple() {
3672 projections.push(SelectProjection::Column {
3673 name: resolution.column().to_string(),
3674 alias: Some(alias.value.clone()),
3675 });
3676 } else {
3677 projections.push(SelectProjection::Computed {
3678 expr: resolution.into_scalar_expr(),
3679 alias: alias.value.clone(),
3680 });
3681 }
3682 }
3683 _ => {
3684 let scalar =
3685 translate_scalar_with_context(resolver, id_context.clone(), expr)?;
3686 projections.push(SelectProjection::Computed {
3687 expr: scalar,
3688 alias: alias.value.clone(),
3689 });
3690 }
3691 },
3692 }
3693 }
3694 Ok(projections)
3695 }
3696
3697 #[allow(clippy::too_many_arguments)] fn handle_start_transaction(
3699 &self,
3700 modes: Vec<TransactionMode>,
3701 begin: bool,
3702 transaction: Option<BeginTransactionKind>,
3703 modifier: Option<TransactionModifier>,
3704 statements: Vec<Statement>,
3705 exception: Option<Vec<ExceptionWhen>>,
3706 has_end_keyword: bool,
3707 ) -> SqlResult<RuntimeStatementResult<P>> {
3708 if !modes.is_empty() {
3709 return Err(Error::InvalidArgumentError(
3710 "transaction modes are not supported".into(),
3711 ));
3712 }
3713 if modifier.is_some() {
3714 return Err(Error::InvalidArgumentError(
3715 "transaction modifiers are not supported".into(),
3716 ));
3717 }
3718 if !statements.is_empty() || exception.is_some() || has_end_keyword {
3719 return Err(Error::InvalidArgumentError(
3720 "BEGIN blocks with inline statements or exceptions are not supported".into(),
3721 ));
3722 }
3723 if let Some(kind) = transaction {
3724 match kind {
3725 BeginTransactionKind::Transaction | BeginTransactionKind::Work => {}
3726 }
3727 }
3728 if !begin {
3729 tracing::warn!("Currently treat `START TRANSACTION` same as `BEGIN`")
3731 }
3732
3733 self.execute_plan_statement(PlanStatement::BeginTransaction)
3734 }
3735
3736 fn handle_commit(
3737 &self,
3738 chain: bool,
3739 end: bool,
3740 modifier: Option<TransactionModifier>,
3741 ) -> SqlResult<RuntimeStatementResult<P>> {
3742 if chain {
3743 return Err(Error::InvalidArgumentError(
3744 "COMMIT AND [NO] CHAIN is not supported".into(),
3745 ));
3746 }
3747 if end {
3748 return Err(Error::InvalidArgumentError(
3749 "END blocks are not supported".into(),
3750 ));
3751 }
3752 if modifier.is_some() {
3753 return Err(Error::InvalidArgumentError(
3754 "transaction modifiers are not supported".into(),
3755 ));
3756 }
3757
3758 self.execute_plan_statement(PlanStatement::CommitTransaction)
3759 }
3760
3761 fn handle_rollback(
3762 &self,
3763 chain: bool,
3764 savepoint: Option<Ident>,
3765 ) -> SqlResult<RuntimeStatementResult<P>> {
3766 if chain {
3767 return Err(Error::InvalidArgumentError(
3768 "ROLLBACK AND [NO] CHAIN is not supported".into(),
3769 ));
3770 }
3771 if savepoint.is_some() {
3772 return Err(Error::InvalidArgumentError(
3773 "ROLLBACK TO SAVEPOINT is not supported".into(),
3774 ));
3775 }
3776
3777 self.execute_plan_statement(PlanStatement::RollbackTransaction)
3778 }
3779
3780 fn handle_set(&self, set_stmt: Set) -> SqlResult<RuntimeStatementResult<P>> {
3781 match set_stmt {
3782 Set::SingleAssignment {
3783 scope,
3784 hivevar,
3785 variable,
3786 values,
3787 } => {
3788 if scope.is_some() || hivevar {
3789 return Err(Error::InvalidArgumentError(
3790 "SET modifiers are not supported".into(),
3791 ));
3792 }
3793
3794 let variable_name_raw = variable.to_string();
3795 let variable_name = variable_name_raw.to_ascii_lowercase();
3796
3797 match variable_name.as_str() {
3798 "default_null_order" => {
3799 if values.len() != 1 {
3800 return Err(Error::InvalidArgumentError(
3801 "SET default_null_order expects exactly one value".into(),
3802 ));
3803 }
3804
3805 let value_expr = &values[0];
3806 let normalized = match value_expr {
3807 SqlExpr::Value(value_with_span) => value_with_span
3808 .value
3809 .clone()
3810 .into_string()
3811 .map(|s| s.to_ascii_lowercase()),
3812 SqlExpr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
3813 _ => None,
3814 };
3815
3816 if !matches!(normalized.as_deref(), Some("nulls_first" | "nulls_last")) {
3817 return Err(Error::InvalidArgumentError(format!(
3818 "unsupported value for SET default_null_order: {value_expr:?}"
3819 )));
3820 }
3821
3822 let use_nulls_first = matches!(normalized.as_deref(), Some("nulls_first"));
3823 self.default_nulls_first
3824 .store(use_nulls_first, AtomicOrdering::Relaxed);
3825
3826 Ok(RuntimeStatementResult::NoOp)
3827 }
3828 "immediate_transaction_mode" => {
3829 if values.len() != 1 {
3830 return Err(Error::InvalidArgumentError(
3831 "SET immediate_transaction_mode expects exactly one value".into(),
3832 ));
3833 }
3834 let normalized = values[0].to_string().to_ascii_lowercase();
3835 let enabled = match normalized.as_str() {
3836 "true" | "on" | "1" => true,
3837 "false" | "off" | "0" => false,
3838 _ => {
3839 return Err(Error::InvalidArgumentError(format!(
3840 "unsupported value for SET immediate_transaction_mode: {}",
3841 values[0]
3842 )));
3843 }
3844 };
3845 if !enabled {
3846 tracing::warn!(
3847 "SET immediate_transaction_mode=false has no effect; continuing with auto mode"
3848 );
3849 }
3850 Ok(RuntimeStatementResult::NoOp)
3851 }
3852 _ => Err(Error::InvalidArgumentError(format!(
3853 "unsupported SET variable: {variable_name_raw}"
3854 ))),
3855 }
3856 }
3857 other => Err(Error::InvalidArgumentError(format!(
3858 "unsupported SQL SET statement: {other:?}",
3859 ))),
3860 }
3861 }
3862
3863 fn handle_pragma(
3864 &self,
3865 name: ObjectName,
3866 value: Option<Value>,
3867 is_eq: bool,
3868 ) -> SqlResult<RuntimeStatementResult<P>> {
3869 let (display, canonical) = canonical_object_name(&name)?;
3870 if value.is_some() || is_eq {
3871 return Err(Error::InvalidArgumentError(format!(
3872 "PRAGMA '{display}' does not accept a value"
3873 )));
3874 }
3875
3876 match canonical.as_str() {
3877 "enable_verification" | "disable_verification" => Ok(RuntimeStatementResult::NoOp),
3878 _ => Err(Error::InvalidArgumentError(format!(
3879 "unsupported PRAGMA '{}'",
3880 display
3881 ))),
3882 }
3883 }
3884}
3885
3886fn canonical_object_name(name: &ObjectName) -> SqlResult<(String, String)> {
3887 if name.0.is_empty() {
3888 return Err(Error::InvalidArgumentError(
3889 "object name must not be empty".into(),
3890 ));
3891 }
3892 let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
3893 for part in &name.0 {
3894 let ident = match part {
3895 ObjectNamePart::Identifier(ident) => ident,
3896 _ => {
3897 return Err(Error::InvalidArgumentError(
3898 "object names using functions are not supported".into(),
3899 ));
3900 }
3901 };
3902 parts.push(ident.value.clone());
3903 }
3904 let display = parts.join(".");
3905 let canonical = display.to_ascii_lowercase();
3906 Ok((display, canonical))
3907}
3908
3909fn parse_schema_qualified_name(name: &ObjectName) -> SqlResult<(Option<String>, String)> {
3918 if name.0.is_empty() {
3919 return Err(Error::InvalidArgumentError(
3920 "object name must not be empty".into(),
3921 ));
3922 }
3923
3924 let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
3925 for part in &name.0 {
3926 let ident = match part {
3927 ObjectNamePart::Identifier(ident) => ident,
3928 _ => {
3929 return Err(Error::InvalidArgumentError(
3930 "object names using functions are not supported".into(),
3931 ));
3932 }
3933 };
3934 parts.push(ident.value.clone());
3935 }
3936
3937 match parts.len() {
3938 1 => Ok((None, parts[0].clone())),
3939 2 => Ok((Some(parts[0].clone()), parts[1].clone())),
3940 _ => Err(Error::InvalidArgumentError(format!(
3941 "table name has too many parts: {}",
3942 name
3943 ))),
3944 }
3945}
3946
3947fn extract_index_column_name(
3961 index_col: &sqlparser::ast::IndexColumn,
3962 context: &str,
3963 allow_sort_options: bool,
3964 allow_compound: bool,
3965) -> SqlResult<String> {
3966 use sqlparser::ast::Expr as SqlExpr;
3967
3968 if index_col.operator_class.is_some() {
3970 return Err(Error::InvalidArgumentError(format!(
3971 "{} operator classes are not supported",
3972 context
3973 )));
3974 }
3975
3976 let order_expr = &index_col.column;
3977
3978 if allow_sort_options {
3980 let _ascending = order_expr.options.asc.unwrap_or(true);
3982 let _nulls_first = order_expr.options.nulls_first.unwrap_or(false);
3983 } else {
3985 if order_expr.options.asc.is_some()
3987 || order_expr.options.nulls_first.is_some()
3988 || order_expr.with_fill.is_some()
3989 {
3990 return Err(Error::InvalidArgumentError(format!(
3991 "{} columns must be simple identifiers",
3992 context
3993 )));
3994 }
3995 }
3996
3997 let column_name = match &order_expr.expr {
3999 SqlExpr::Identifier(ident) => ident.value.clone(),
4000 SqlExpr::CompoundIdentifier(parts) => {
4001 if allow_compound {
4002 parts
4004 .last()
4005 .map(|ident| ident.value.clone())
4006 .ok_or_else(|| {
4007 Error::InvalidArgumentError(format!(
4008 "invalid column reference in {}",
4009 context
4010 ))
4011 })?
4012 } else if parts.len() == 1 {
4013 parts[0].value.clone()
4015 } else {
4016 return Err(Error::InvalidArgumentError(format!(
4017 "{} columns must be column identifiers",
4018 context
4019 )));
4020 }
4021 }
4022 other => {
4023 return Err(Error::InvalidArgumentError(format!(
4024 "{} only supports column references, found {:?}",
4025 context, other
4026 )));
4027 }
4028 };
4029
4030 Ok(column_name)
4031}
4032
4033fn validate_create_table_common(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
4034 if stmt.clone.is_some() || stmt.like.is_some() {
4035 return Err(Error::InvalidArgumentError(
4036 "CREATE TABLE LIKE/CLONE is not supported".into(),
4037 ));
4038 }
4039 if stmt.or_replace && stmt.if_not_exists {
4040 return Err(Error::InvalidArgumentError(
4041 "CREATE TABLE cannot combine OR REPLACE with IF NOT EXISTS".into(),
4042 ));
4043 }
4044 use sqlparser::ast::TableConstraint;
4045
4046 let mut seen_primary_key = false;
4047 for constraint in &stmt.constraints {
4048 match constraint {
4049 TableConstraint::PrimaryKey { .. } => {
4050 if seen_primary_key {
4051 return Err(Error::InvalidArgumentError(
4052 "multiple PRIMARY KEY constraints are not supported".into(),
4053 ));
4054 }
4055 seen_primary_key = true;
4056 }
4057 TableConstraint::Unique { .. } => {
4058 }
4060 TableConstraint::ForeignKey { .. } => {
4061 }
4063 other => {
4064 return Err(Error::InvalidArgumentError(format!(
4065 "table-level constraint {:?} is not supported",
4066 other
4067 )));
4068 }
4069 }
4070 }
4071 Ok(())
4072}
4073
4074fn validate_check_constraint(
4075 check_expr: &sqlparser::ast::Expr,
4076 table_name: &str,
4077 column_names: &[&str],
4078) -> SqlResult<()> {
4079 use sqlparser::ast::Expr as SqlExpr;
4080
4081 let column_names_lower: HashSet<String> = column_names
4082 .iter()
4083 .map(|name| name.to_ascii_lowercase())
4084 .collect();
4085
4086 let mut stack: Vec<&SqlExpr> = vec![check_expr];
4087
4088 while let Some(expr) = stack.pop() {
4089 match expr {
4090 SqlExpr::Subquery(_) => {
4091 return Err(Error::InvalidArgumentError(
4092 "Subqueries are not allowed in CHECK constraints".into(),
4093 ));
4094 }
4095 SqlExpr::Function(func) => {
4096 let func_name = func.name.to_string().to_uppercase();
4097 if matches!(func_name.as_str(), "SUM" | "AVG" | "COUNT" | "MIN" | "MAX") {
4098 return Err(Error::InvalidArgumentError(
4099 "Aggregate functions are not allowed in CHECK constraints".into(),
4100 ));
4101 }
4102
4103 if let sqlparser::ast::FunctionArguments::List(list) = &func.args {
4104 for arg in &list.args {
4105 if let sqlparser::ast::FunctionArg::Unnamed(
4106 sqlparser::ast::FunctionArgExpr::Expr(expr),
4107 ) = arg
4108 {
4109 stack.push(expr);
4110 }
4111 }
4112 }
4113 }
4114 SqlExpr::Identifier(ident) => {
4115 if !column_names_lower.contains(&ident.value.to_ascii_lowercase()) {
4116 return Err(Error::InvalidArgumentError(format!(
4117 "Column '{}' referenced in CHECK constraint does not exist",
4118 ident.value
4119 )));
4120 }
4121 }
4122 SqlExpr::CompoundIdentifier(idents) => {
4123 if idents.len() == 2 {
4124 let first = idents[0].value.as_str();
4125 let second = &idents[1].value;
4126
4127 if column_names_lower.contains(&first.to_ascii_lowercase()) {
4128 continue;
4129 }
4130
4131 if !first.eq_ignore_ascii_case(table_name) {
4132 return Err(Error::InvalidArgumentError(format!(
4133 "CHECK constraint references column from different table '{}'",
4134 first
4135 )));
4136 }
4137
4138 if !column_names_lower.contains(&second.to_ascii_lowercase()) {
4139 return Err(Error::InvalidArgumentError(format!(
4140 "Column '{}' referenced in CHECK constraint does not exist",
4141 second
4142 )));
4143 }
4144 } else if idents.len() == 3 {
4145 let first = &idents[0].value;
4146 let second = &idents[1].value;
4147 let third = &idents[2].value;
4148
4149 if first.eq_ignore_ascii_case(table_name) {
4150 if !column_names_lower.contains(&second.to_ascii_lowercase()) {
4151 return Err(Error::InvalidArgumentError(format!(
4152 "Column '{}' referenced in CHECK constraint does not exist",
4153 second
4154 )));
4155 }
4156 } else if second.eq_ignore_ascii_case(table_name) {
4157 if !column_names_lower.contains(&third.to_ascii_lowercase()) {
4158 return Err(Error::InvalidArgumentError(format!(
4159 "Column '{}' referenced in CHECK constraint does not exist",
4160 third
4161 )));
4162 }
4163 } else {
4164 return Err(Error::InvalidArgumentError(format!(
4165 "CHECK constraint references column from different table '{}'",
4166 second
4167 )));
4168 }
4169 }
4170 }
4171 SqlExpr::BinaryOp { left, right, .. } => {
4172 stack.push(left);
4173 stack.push(right);
4174 }
4175 SqlExpr::UnaryOp { expr, .. } | SqlExpr::Nested(expr) => {
4176 stack.push(expr);
4177 }
4178 SqlExpr::Value(_) | SqlExpr::TypedString { .. } => {}
4179 _ => {}
4180 }
4181 }
4182
4183 Ok(())
4184}
4185
4186fn validate_create_table_definition(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
4187 for column in &stmt.columns {
4188 for ColumnOptionDef { option, .. } in &column.options {
4189 match option {
4190 ColumnOption::Null
4191 | ColumnOption::NotNull
4192 | ColumnOption::Unique { .. }
4193 | ColumnOption::Check(_)
4194 | ColumnOption::ForeignKey { .. } => {}
4195 ColumnOption::Default(_) => {
4196 return Err(Error::InvalidArgumentError(format!(
4197 "DEFAULT values are not supported for column '{}'",
4198 column.name
4199 )));
4200 }
4201 other => {
4202 return Err(Error::InvalidArgumentError(format!(
4203 "unsupported column option {:?} on '{}'",
4204 other, column.name
4205 )));
4206 }
4207 }
4208 }
4209 }
4210 Ok(())
4211}
4212
4213fn validate_create_table_as(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
4214 if !stmt.columns.is_empty() {
4215 return Err(Error::InvalidArgumentError(
4216 "CREATE TABLE AS SELECT does not support column definitions yet".into(),
4217 ));
4218 }
4219 Ok(())
4220}
4221
4222fn validate_simple_query(query: &Query) -> SqlResult<()> {
4223 if query.with.is_some() {
4224 return Err(Error::InvalidArgumentError(
4225 "WITH clauses are not supported".into(),
4226 ));
4227 }
4228 if let Some(limit_clause) = &query.limit_clause {
4229 match limit_clause {
4230 LimitClause::LimitOffset {
4231 offset: Some(_), ..
4232 }
4233 | LimitClause::OffsetCommaLimit { .. } => {
4234 return Err(Error::InvalidArgumentError(
4235 "OFFSET clauses are not supported".into(),
4236 ));
4237 }
4238 LimitClause::LimitOffset { limit_by, .. } if !limit_by.is_empty() => {
4239 return Err(Error::InvalidArgumentError(
4240 "LIMIT BY clauses are not supported".into(),
4241 ));
4242 }
4243 _ => {}
4244 }
4245 }
4246 if query.fetch.is_some() {
4247 return Err(Error::InvalidArgumentError(
4248 "FETCH clauses are not supported".into(),
4249 ));
4250 }
4251 Ok(())
4252}
4253
4254fn resolve_column_name(expr: &SqlExpr) -> SqlResult<String> {
4255 match expr {
4256 SqlExpr::Identifier(ident) => Ok(ident.value.clone()),
4257 SqlExpr::CompoundIdentifier(parts) => {
4258 if let Some(last) = parts.last() {
4259 Ok(last.value.clone())
4260 } else {
4261 Err(Error::InvalidArgumentError(
4262 "empty column identifier".into(),
4263 ))
4264 }
4265 }
4266 _ => Err(Error::InvalidArgumentError(
4267 "aggregate arguments must be plain column identifiers".into(),
4268 )),
4269 }
4270}
4271
4272fn validate_projection_alias_qualifiers(
4273 projection_items: &[SelectItem],
4274 alias: &str,
4275) -> SqlResult<()> {
4276 let alias_lower = alias.to_ascii_lowercase();
4277 for item in projection_items {
4278 match item {
4279 SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
4280 if let SqlExpr::CompoundIdentifier(parts) = expr
4281 && parts.len() >= 2
4282 && let Some(first) = parts.first()
4283 && !first.value.eq_ignore_ascii_case(&alias_lower)
4284 {
4285 return Err(Error::InvalidArgumentError(format!(
4286 "Binder Error: table '{}' not found",
4287 first.value
4288 )));
4289 }
4290 }
4291 _ => {}
4292 }
4293 }
4294 Ok(())
4295}
4296
4297#[allow(dead_code)] fn expr_contains_aggregate(expr: &llkv_expr::expr::ScalarExpr<String>) -> bool {
4301 match expr {
4302 llkv_expr::expr::ScalarExpr::Aggregate(_) => true,
4303 llkv_expr::expr::ScalarExpr::Binary { left, right, .. } => {
4304 expr_contains_aggregate(left) || expr_contains_aggregate(right)
4305 }
4306 llkv_expr::expr::ScalarExpr::GetField { base, .. } => expr_contains_aggregate(base),
4307 llkv_expr::expr::ScalarExpr::Column(_) | llkv_expr::expr::ScalarExpr::Literal(_) => false,
4308 }
4309}
4310
4311fn try_parse_aggregate_function(
4312 func: &sqlparser::ast::Function,
4313) -> SqlResult<Option<llkv_expr::expr::AggregateCall<String>>> {
4314 use sqlparser::ast::{FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart};
4315
4316 if func.uses_odbc_syntax {
4317 return Ok(None);
4318 }
4319 if !matches!(func.parameters, FunctionArguments::None) {
4320 return Ok(None);
4321 }
4322 if func.filter.is_some()
4323 || func.null_treatment.is_some()
4324 || func.over.is_some()
4325 || !func.within_group.is_empty()
4326 {
4327 return Ok(None);
4328 }
4329
4330 let func_name = if func.name.0.len() == 1 {
4331 match &func.name.0[0] {
4332 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
4333 _ => return Ok(None),
4334 }
4335 } else {
4336 return Ok(None);
4337 };
4338
4339 let args_slice: &[FunctionArg] = match &func.args {
4340 FunctionArguments::List(list) => {
4341 if list.duplicate_treatment.is_some() || !list.clauses.is_empty() {
4342 return Ok(None);
4343 }
4344 &list.args
4345 }
4346 FunctionArguments::None => &[],
4347 FunctionArguments::Subquery(_) => return Ok(None),
4348 };
4349
4350 let agg_call = match func_name.as_str() {
4351 "count" => {
4352 if args_slice.len() != 1 {
4353 return Err(Error::InvalidArgumentError(
4354 "COUNT accepts exactly one argument".into(),
4355 ));
4356 }
4357 match &args_slice[0] {
4358 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
4359 llkv_expr::expr::AggregateCall::CountStar
4360 }
4361 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
4362 let column = resolve_column_name(arg_expr)?;
4363 llkv_expr::expr::AggregateCall::Count(column)
4364 }
4365 _ => {
4366 return Err(Error::InvalidArgumentError(
4367 "unsupported COUNT argument".into(),
4368 ));
4369 }
4370 }
4371 }
4372 "sum" => {
4373 if args_slice.len() != 1 {
4374 return Err(Error::InvalidArgumentError(
4375 "SUM accepts exactly one argument".into(),
4376 ));
4377 }
4378 let arg_expr = match &args_slice[0] {
4379 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4380 _ => {
4381 return Err(Error::InvalidArgumentError(
4382 "SUM requires a column argument".into(),
4383 ));
4384 }
4385 };
4386
4387 if let Some(column) = parse_count_nulls_case(arg_expr)? {
4389 llkv_expr::expr::AggregateCall::CountNulls(column)
4390 } else {
4391 let column = resolve_column_name(arg_expr)?;
4392 llkv_expr::expr::AggregateCall::Sum(column)
4393 }
4394 }
4395 "min" => {
4396 if args_slice.len() != 1 {
4397 return Err(Error::InvalidArgumentError(
4398 "MIN accepts exactly one argument".into(),
4399 ));
4400 }
4401 let arg_expr = match &args_slice[0] {
4402 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4403 _ => {
4404 return Err(Error::InvalidArgumentError(
4405 "MIN requires a column argument".into(),
4406 ));
4407 }
4408 };
4409 let column = resolve_column_name(arg_expr)?;
4410 llkv_expr::expr::AggregateCall::Min(column)
4411 }
4412 "max" => {
4413 if args_slice.len() != 1 {
4414 return Err(Error::InvalidArgumentError(
4415 "MAX accepts exactly one argument".into(),
4416 ));
4417 }
4418 let arg_expr = match &args_slice[0] {
4419 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
4420 _ => {
4421 return Err(Error::InvalidArgumentError(
4422 "MAX requires a column argument".into(),
4423 ));
4424 }
4425 };
4426 let column = resolve_column_name(arg_expr)?;
4427 llkv_expr::expr::AggregateCall::Max(column)
4428 }
4429 _ => return Ok(None),
4430 };
4431
4432 Ok(Some(agg_call))
4433}
4434
4435fn parse_count_nulls_case(expr: &SqlExpr) -> SqlResult<Option<String>> {
4436 let SqlExpr::Case {
4437 operand,
4438 conditions,
4439 else_result,
4440 ..
4441 } = expr
4442 else {
4443 return Ok(None);
4444 };
4445
4446 if operand.is_some() || conditions.len() != 1 {
4447 return Ok(None);
4448 }
4449
4450 let case_when = &conditions[0];
4451 if !is_integer_literal(&case_when.result, 1) {
4452 return Ok(None);
4453 }
4454
4455 let else_expr = match else_result {
4456 Some(expr) => expr.as_ref(),
4457 None => return Ok(None),
4458 };
4459 if !is_integer_literal(else_expr, 0) {
4460 return Ok(None);
4461 }
4462
4463 let inner = match &case_when.condition {
4464 SqlExpr::IsNull(inner) => inner.as_ref(),
4465 _ => return Ok(None),
4466 };
4467
4468 resolve_column_name(inner).map(Some)
4469}
4470
4471fn is_integer_literal(expr: &SqlExpr, expected: i64) -> bool {
4472 match expr {
4473 SqlExpr::Value(ValueWithSpan {
4474 value: Value::Number(text, _),
4475 ..
4476 }) => text.parse::<i64>() == Ok(expected),
4477 _ => false,
4478 }
4479}
4480
4481fn translate_condition_with_context(
4482 resolver: &IdentifierResolver<'_>,
4483 context: IdentifierContext,
4484 expr: &SqlExpr,
4485) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
4486 enum ConditionExitContext {
4493 And,
4494 Or,
4495 Not,
4496 Nested,
4497 }
4498
4499 type ConditionFrame<'a> = llkv_plan::TransformFrame<
4500 'a,
4501 SqlExpr,
4502 llkv_expr::expr::Expr<'static, String>,
4503 ConditionExitContext,
4504 >;
4505
4506 let mut work_stack: Vec<ConditionFrame> = vec![ConditionFrame::Enter(expr)];
4507 let mut result_stack: Vec<llkv_expr::expr::Expr<'static, String>> = Vec::new();
4508
4509 while let Some(frame) = work_stack.pop() {
4510 match frame {
4511 ConditionFrame::Enter(node) => match node {
4512 SqlExpr::BinaryOp { left, op, right } => match op {
4513 BinaryOperator::And => {
4514 work_stack.push(ConditionFrame::Exit(ConditionExitContext::And));
4515 work_stack.push(ConditionFrame::Enter(right));
4516 work_stack.push(ConditionFrame::Enter(left));
4517 }
4518 BinaryOperator::Or => {
4519 work_stack.push(ConditionFrame::Exit(ConditionExitContext::Or));
4520 work_stack.push(ConditionFrame::Enter(right));
4521 work_stack.push(ConditionFrame::Enter(left));
4522 }
4523 BinaryOperator::Eq
4524 | BinaryOperator::NotEq
4525 | BinaryOperator::Lt
4526 | BinaryOperator::LtEq
4527 | BinaryOperator::Gt
4528 | BinaryOperator::GtEq => {
4529 let result = translate_comparison_with_context(
4530 resolver,
4531 context.clone(),
4532 left,
4533 op.clone(),
4534 right,
4535 )?;
4536 work_stack.push(ConditionFrame::Leaf(result));
4537 }
4538 other => {
4539 return Err(Error::InvalidArgumentError(format!(
4540 "unsupported binary operator in WHERE clause: {other:?}"
4541 )));
4542 }
4543 },
4544 SqlExpr::UnaryOp {
4545 op: UnaryOperator::Not,
4546 expr: inner,
4547 } => {
4548 work_stack.push(ConditionFrame::Exit(ConditionExitContext::Not));
4549 work_stack.push(ConditionFrame::Enter(inner));
4550 }
4551 SqlExpr::Nested(inner) => {
4552 work_stack.push(ConditionFrame::Exit(ConditionExitContext::Nested));
4553 work_stack.push(ConditionFrame::Enter(inner));
4554 }
4555 SqlExpr::IsNull(inner) => {
4556 let scalar = translate_scalar_with_context(resolver, context.clone(), inner)?;
4557 match scalar {
4558 llkv_expr::expr::ScalarExpr::Column(column) => {
4559 work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::Pred(
4560 llkv_expr::expr::Filter {
4561 field_id: column,
4562 op: llkv_expr::expr::Operator::IsNull,
4563 },
4564 )));
4565 }
4566 _ => {
4567 return Err(Error::InvalidArgumentError(
4568 "IS NULL predicates currently support column references only"
4569 .into(),
4570 ));
4571 }
4572 }
4573 }
4574 SqlExpr::IsNotNull(inner) => {
4575 let scalar = translate_scalar_with_context(resolver, context.clone(), inner)?;
4576 match scalar {
4577 llkv_expr::expr::ScalarExpr::Column(column) => {
4578 work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::Pred(
4579 llkv_expr::expr::Filter {
4580 field_id: column,
4581 op: llkv_expr::expr::Operator::IsNotNull,
4582 },
4583 )));
4584 }
4585 _ => {
4586 return Err(Error::InvalidArgumentError(
4587 "IS NOT NULL predicates currently support column references only"
4588 .into(),
4589 ));
4590 }
4591 }
4592 }
4593 SqlExpr::InList {
4594 expr: in_expr,
4595 list,
4596 negated,
4597 } => {
4598 if list.is_empty() {
4599 let result = if *negated {
4600 llkv_expr::expr::Expr::Literal(true)
4601 } else {
4602 llkv_expr::expr::Expr::Literal(false)
4603 };
4604 work_stack.push(ConditionFrame::Leaf(result));
4605 } else {
4606 let target =
4607 translate_scalar_with_context(resolver, context.clone(), in_expr)?;
4608 let mut values = Vec::with_capacity(list.len());
4609 for value_expr in list {
4610 let scalar = translate_scalar_with_context(
4611 resolver,
4612 context.clone(),
4613 value_expr,
4614 )?;
4615 values.push(scalar);
4616 }
4617
4618 work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::InList {
4619 expr: target,
4620 list: values,
4621 negated: *negated,
4622 }));
4623 }
4624 }
4625 SqlExpr::InSubquery { .. } => {
4626 return Err(Error::InvalidArgumentError(
4627 "IN (SELECT ...) subqueries must be materialized before translation".into(),
4628 ));
4629 }
4630 SqlExpr::Between {
4631 expr: between_expr,
4632 negated,
4633 low,
4634 high,
4635 } => {
4636 let lower_bound = translate_comparison_with_context(
4637 resolver,
4638 context.clone(),
4639 between_expr,
4640 BinaryOperator::GtEq,
4641 low,
4642 )?;
4643 let upper_bound = translate_comparison_with_context(
4644 resolver,
4645 context.clone(),
4646 between_expr,
4647 BinaryOperator::LtEq,
4648 high,
4649 )?;
4650
4651 let between_expr_result =
4652 llkv_expr::expr::Expr::And(vec![lower_bound, upper_bound]);
4653
4654 let result = if *negated {
4655 llkv_expr::expr::Expr::not(between_expr_result)
4656 } else {
4657 between_expr_result
4658 };
4659 work_stack.push(ConditionFrame::Leaf(result));
4660 }
4661 other => {
4662 return Err(Error::InvalidArgumentError(format!(
4663 "unsupported WHERE clause: {other:?}"
4664 )));
4665 }
4666 },
4667 ConditionFrame::Leaf(translated) => {
4668 result_stack.push(translated);
4669 }
4670 ConditionFrame::Exit(exit_context) => match exit_context {
4671 ConditionExitContext::And => {
4672 let right = result_stack.pop().ok_or_else(|| {
4673 Error::Internal(
4674 "translate_condition: result stack underflow for And right".into(),
4675 )
4676 })?;
4677 let left = result_stack.pop().ok_or_else(|| {
4678 Error::Internal(
4679 "translate_condition: result stack underflow for And left".into(),
4680 )
4681 })?;
4682 result_stack.push(flatten_and(left, right));
4683 }
4684 ConditionExitContext::Or => {
4685 let right = result_stack.pop().ok_or_else(|| {
4686 Error::Internal(
4687 "translate_condition: result stack underflow for Or right".into(),
4688 )
4689 })?;
4690 let left = result_stack.pop().ok_or_else(|| {
4691 Error::Internal(
4692 "translate_condition: result stack underflow for Or left".into(),
4693 )
4694 })?;
4695 result_stack.push(flatten_or(left, right));
4696 }
4697 ConditionExitContext::Not => {
4698 let inner = result_stack.pop().ok_or_else(|| {
4699 Error::Internal(
4700 "translate_condition: result stack underflow for Not".into(),
4701 )
4702 })?;
4703 result_stack.push(llkv_expr::expr::Expr::not(inner));
4704 }
4705 ConditionExitContext::Nested => {
4706 }
4708 },
4709 }
4710 }
4711
4712 result_stack.pop().ok_or_else(|| {
4713 Error::Internal("translate_condition_with_context: empty result stack".into())
4714 })
4715}
4716
4717fn flatten_and(
4718 left: llkv_expr::expr::Expr<'static, String>,
4719 right: llkv_expr::expr::Expr<'static, String>,
4720) -> llkv_expr::expr::Expr<'static, String> {
4721 let mut children: Vec<llkv_expr::expr::Expr<'static, String>> = Vec::new();
4722 match left {
4723 llkv_expr::expr::Expr::And(mut left_children) => children.append(&mut left_children),
4724 other => children.push(other),
4725 }
4726 match right {
4727 llkv_expr::expr::Expr::And(mut right_children) => children.append(&mut right_children),
4728 other => children.push(other),
4729 }
4730 if children.len() == 1 {
4731 children.into_iter().next().unwrap()
4732 } else {
4733 llkv_expr::expr::Expr::And(children)
4734 }
4735}
4736
4737fn flatten_or(
4738 left: llkv_expr::expr::Expr<'static, String>,
4739 right: llkv_expr::expr::Expr<'static, String>,
4740) -> llkv_expr::expr::Expr<'static, String> {
4741 let mut children: Vec<llkv_expr::expr::Expr<'static, String>> = Vec::new();
4742 match left {
4743 llkv_expr::expr::Expr::Or(mut left_children) => children.append(&mut left_children),
4744 other => children.push(other),
4745 }
4746 match right {
4747 llkv_expr::expr::Expr::Or(mut right_children) => children.append(&mut right_children),
4748 other => children.push(other),
4749 }
4750 if children.len() == 1 {
4751 children.into_iter().next().unwrap()
4752 } else {
4753 llkv_expr::expr::Expr::Or(children)
4754 }
4755}
4756
4757fn translate_comparison_with_context(
4758 resolver: &IdentifierResolver<'_>,
4759 context: IdentifierContext,
4760 left: &SqlExpr,
4761 op: BinaryOperator,
4762 right: &SqlExpr,
4763) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
4764 let left_scalar = translate_scalar_with_context(resolver, context.clone(), left)?;
4765 let right_scalar = translate_scalar_with_context(resolver, context, right)?;
4766 let compare_op = match op {
4767 BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
4768 BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
4769 BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
4770 BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
4771 BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
4772 BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
4773 other => {
4774 return Err(Error::InvalidArgumentError(format!(
4775 "unsupported comparison operator: {other:?}"
4776 )));
4777 }
4778 };
4779
4780 if let (
4781 llkv_expr::expr::ScalarExpr::Column(column),
4782 llkv_expr::expr::ScalarExpr::Literal(literal),
4783 ) = (&left_scalar, &right_scalar)
4784 && let Some(op) = compare_op_to_filter_operator(compare_op, literal)
4785 {
4786 tracing::debug!(
4787 column = ?column,
4788 literal = ?literal,
4789 ?compare_op,
4790 "translate_comparison direct"
4791 );
4792 return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4793 field_id: column.clone(),
4794 op,
4795 }));
4796 }
4797
4798 if let (
4799 llkv_expr::expr::ScalarExpr::Literal(literal),
4800 llkv_expr::expr::ScalarExpr::Column(column),
4801 ) = (&left_scalar, &right_scalar)
4802 && let Some(flipped) = flip_compare_op(compare_op)
4803 && let Some(op) = compare_op_to_filter_operator(flipped, literal)
4804 {
4805 tracing::debug!(
4806 column = ?column,
4807 literal = ?literal,
4808 original_op = ?compare_op,
4809 flipped_op = ?flipped,
4810 "translate_comparison flipped"
4811 );
4812 return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
4813 field_id: column.clone(),
4814 op,
4815 }));
4816 }
4817
4818 Ok(llkv_expr::expr::Expr::Compare {
4819 left: left_scalar,
4820 op: compare_op,
4821 right: right_scalar,
4822 })
4823}
4824
4825fn compare_op_to_filter_operator(
4826 op: llkv_expr::expr::CompareOp,
4827 literal: &Literal,
4828) -> Option<llkv_expr::expr::Operator<'static>> {
4829 if matches!(literal, Literal::Null) {
4830 return None;
4831 }
4832 let lit = literal.clone();
4833 tracing::debug!(?op, literal = ?literal, "compare_op_to_filter_operator input");
4834 match op {
4835 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::Operator::Equals(lit)),
4836 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::Operator::LessThan(lit)),
4837 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::Operator::LessThanOrEquals(lit)),
4838 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::Operator::GreaterThan(lit)),
4839 llkv_expr::expr::CompareOp::GtEq => {
4840 Some(llkv_expr::expr::Operator::GreaterThanOrEquals(lit))
4841 }
4842 llkv_expr::expr::CompareOp::NotEq => None,
4843 }
4844}
4845
4846fn flip_compare_op(op: llkv_expr::expr::CompareOp) -> Option<llkv_expr::expr::CompareOp> {
4847 match op {
4848 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::CompareOp::Eq),
4849 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::CompareOp::Gt),
4850 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::CompareOp::GtEq),
4851 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::CompareOp::Lt),
4852 llkv_expr::expr::CompareOp::GtEq => Some(llkv_expr::expr::CompareOp::LtEq),
4853 llkv_expr::expr::CompareOp::NotEq => None,
4854 }
4855}
4856fn translate_scalar_with_context(
4859 resolver: &IdentifierResolver<'_>,
4860 context: IdentifierContext,
4861 expr: &SqlExpr,
4862) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4863 translate_scalar_internal(expr, Some(resolver), Some(&context))
4864}
4865
4866#[allow(dead_code)]
4867fn translate_scalar(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4868 translate_scalar_internal(expr, None, None)
4869}
4870
4871fn translate_scalar_internal(
4872 expr: &SqlExpr,
4873 resolver: Option<&IdentifierResolver<'_>>,
4874 context: Option<&IdentifierContext>,
4875) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
4876 enum ScalarExitContext {
4884 BinaryOp { op: BinaryOperator },
4885 UnaryMinus,
4886 UnaryPlus,
4887 Nested,
4888 }
4889
4890 type ScalarFrame<'a> =
4891 TransformFrame<'a, SqlExpr, llkv_expr::expr::ScalarExpr<String>, ScalarExitContext>;
4892
4893 let mut work_stack: Vec<ScalarFrame> = vec![ScalarFrame::Enter(expr)];
4894 let mut result_stack: Vec<llkv_expr::expr::ScalarExpr<String>> = Vec::new();
4895
4896 while let Some(frame) = work_stack.pop() {
4897 match frame {
4898 ScalarFrame::Enter(node) => match node {
4899 SqlExpr::Identifier(ident) => {
4900 if let (Some(resolver), Some(ctx)) = (resolver, context) {
4901 let parts = vec![ident.value.clone()];
4902 let resolution = resolver.resolve(&parts, (*ctx).clone())?;
4903 work_stack.push(ScalarFrame::Leaf(resolution.into_scalar_expr()));
4904 } else {
4905 work_stack.push(ScalarFrame::Leaf(llkv_expr::expr::ScalarExpr::column(
4906 ident.value.clone(),
4907 )));
4908 }
4909 }
4910 SqlExpr::CompoundIdentifier(idents) => {
4911 if idents.is_empty() {
4912 return Err(Error::InvalidArgumentError(
4913 "invalid compound identifier".into(),
4914 ));
4915 }
4916
4917 if let (Some(resolver), Some(ctx)) = (resolver, context) {
4918 let parts: Vec<String> =
4919 idents.iter().map(|ident| ident.value.clone()).collect();
4920 let resolution = resolver.resolve(&parts, (*ctx).clone())?;
4921 work_stack.push(ScalarFrame::Leaf(resolution.into_scalar_expr()));
4922 } else {
4923 let column_name = idents[0].value.clone();
4924 let mut result = llkv_expr::expr::ScalarExpr::column(column_name);
4925
4926 for part in &idents[1..] {
4927 let field_name = part.value.clone();
4928 result = llkv_expr::expr::ScalarExpr::get_field(result, field_name);
4929 }
4930
4931 work_stack.push(ScalarFrame::Leaf(result));
4932 }
4933 }
4934 SqlExpr::Value(value) => {
4935 let result = literal_from_value(value)?;
4936 work_stack.push(ScalarFrame::Leaf(result));
4937 }
4938 SqlExpr::BinaryOp { left, op, right } => {
4939 work_stack.push(ScalarFrame::Exit(ScalarExitContext::BinaryOp {
4940 op: op.clone(),
4941 }));
4942 work_stack.push(ScalarFrame::Enter(right));
4943 work_stack.push(ScalarFrame::Enter(left));
4944 }
4945 SqlExpr::UnaryOp {
4946 op: UnaryOperator::Minus,
4947 expr: inner,
4948 } => {
4949 work_stack.push(ScalarFrame::Exit(ScalarExitContext::UnaryMinus));
4950 work_stack.push(ScalarFrame::Enter(inner));
4951 }
4952 SqlExpr::UnaryOp {
4953 op: UnaryOperator::Plus,
4954 expr: inner,
4955 } => {
4956 work_stack.push(ScalarFrame::Exit(ScalarExitContext::UnaryPlus));
4957 work_stack.push(ScalarFrame::Enter(inner));
4958 }
4959 SqlExpr::Nested(inner) => {
4960 work_stack.push(ScalarFrame::Exit(ScalarExitContext::Nested));
4961 work_stack.push(ScalarFrame::Enter(inner));
4962 }
4963 SqlExpr::Cast { expr: inner, .. } => {
4964 work_stack.push(ScalarFrame::Enter(inner));
4967 }
4968 SqlExpr::Function(func) => {
4969 if let Some(agg_call) = try_parse_aggregate_function(func)? {
4970 work_stack.push(ScalarFrame::Leaf(llkv_expr::expr::ScalarExpr::aggregate(
4971 agg_call,
4972 )));
4973 } else {
4974 return Err(Error::InvalidArgumentError(format!(
4975 "unsupported function in scalar expression: {:?}",
4976 func.name
4977 )));
4978 }
4979 }
4980 SqlExpr::Dictionary(fields) => {
4981 let mut struct_fields = Vec::new();
4983 for entry in fields {
4984 let key = entry.key.value.clone();
4985 let value_expr =
4988 translate_scalar_internal(&entry.value, resolver, context)?;
4989 match value_expr {
4990 llkv_expr::expr::ScalarExpr::Literal(lit) => {
4991 struct_fields.push((key, Box::new(lit)));
4992 }
4993 _ => {
4994 return Err(Error::InvalidArgumentError(
4995 "Dictionary values must be literals".to_string(),
4996 ));
4997 }
4998 }
4999 }
5000 work_stack.push(ScalarFrame::Leaf(llkv_expr::expr::ScalarExpr::literal(
5001 Literal::Struct(struct_fields),
5002 )));
5003 }
5004 other => {
5005 return Err(Error::InvalidArgumentError(format!(
5006 "unsupported scalar expression: {other:?}"
5007 )));
5008 }
5009 },
5010 ScalarFrame::Leaf(translated) => {
5011 result_stack.push(translated);
5012 }
5013 ScalarFrame::Exit(exit_context) => match exit_context {
5014 ScalarExitContext::BinaryOp { op } => {
5015 let right_expr = result_stack.pop().ok_or_else(|| {
5016 Error::Internal(
5017 "translate_scalar: result stack underflow for BinaryOp right".into(),
5018 )
5019 })?;
5020 let left_expr = result_stack.pop().ok_or_else(|| {
5021 Error::Internal(
5022 "translate_scalar: result stack underflow for BinaryOp left".into(),
5023 )
5024 })?;
5025 let binary_op = match op {
5026 BinaryOperator::Plus => llkv_expr::expr::BinaryOp::Add,
5027 BinaryOperator::Minus => llkv_expr::expr::BinaryOp::Subtract,
5028 BinaryOperator::Multiply => llkv_expr::expr::BinaryOp::Multiply,
5029 BinaryOperator::Divide => llkv_expr::expr::BinaryOp::Divide,
5030 BinaryOperator::Modulo => llkv_expr::expr::BinaryOp::Modulo,
5031 other => {
5032 return Err(Error::InvalidArgumentError(format!(
5033 "unsupported scalar binary operator: {other:?}"
5034 )));
5035 }
5036 };
5037 result_stack.push(llkv_expr::expr::ScalarExpr::binary(
5038 left_expr, binary_op, right_expr,
5039 ));
5040 }
5041 ScalarExitContext::UnaryMinus => {
5042 let inner = result_stack.pop().ok_or_else(|| {
5043 Error::Internal(
5044 "translate_scalar: result stack underflow for UnaryMinus".into(),
5045 )
5046 })?;
5047 match inner {
5048 llkv_expr::expr::ScalarExpr::Literal(lit) => match lit {
5049 Literal::Integer(v) => {
5050 result_stack.push(llkv_expr::expr::ScalarExpr::literal(
5051 Literal::Integer(-v),
5052 ));
5053 }
5054 Literal::Float(v) => {
5055 result_stack
5056 .push(llkv_expr::expr::ScalarExpr::literal(Literal::Float(-v)));
5057 }
5058 Literal::Boolean(_) => {
5059 return Err(Error::InvalidArgumentError(
5060 "cannot negate boolean literal".into(),
5061 ));
5062 }
5063 Literal::String(_) => {
5064 return Err(Error::InvalidArgumentError(
5065 "cannot negate string literal".into(),
5066 ));
5067 }
5068 Literal::Struct(_) => {
5069 return Err(Error::InvalidArgumentError(
5070 "cannot negate struct literal".into(),
5071 ));
5072 }
5073 Literal::Null => {
5074 result_stack
5075 .push(llkv_expr::expr::ScalarExpr::literal(Literal::Null));
5076 }
5077 },
5078 other => {
5079 let zero = llkv_expr::expr::ScalarExpr::literal(Literal::Integer(0));
5080 result_stack.push(llkv_expr::expr::ScalarExpr::binary(
5081 zero,
5082 llkv_expr::expr::BinaryOp::Subtract,
5083 other,
5084 ));
5085 }
5086 }
5087 }
5088 ScalarExitContext::UnaryPlus => {
5089 }
5091 ScalarExitContext::Nested => {
5092 }
5094 },
5095 }
5096 }
5097
5098 result_stack
5099 .pop()
5100 .ok_or_else(|| Error::Internal("translate_scalar: empty result stack".into()))
5101}
5102
5103fn literal_from_value(value: &ValueWithSpan) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
5104 match &value.value {
5105 Value::Number(text, _) => {
5106 if text.contains(['.', 'e', 'E']) {
5107 let parsed = text.parse::<f64>().map_err(|err| {
5108 Error::InvalidArgumentError(format!("invalid float literal: {err}"))
5109 })?;
5110 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(parsed)))
5111 } else {
5112 let parsed = text.parse::<i128>().map_err(|err| {
5113 Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
5114 })?;
5115 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(
5116 parsed,
5117 )))
5118 }
5119 }
5120 Value::Boolean(value) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Boolean(
5121 *value,
5122 ))),
5123 Value::Null => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Null)),
5124 other => {
5125 if let Some(text) = other.clone().into_string() {
5126 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::String(text)))
5127 } else {
5128 Err(Error::InvalidArgumentError(format!(
5129 "unsupported literal: {other:?}"
5130 )))
5131 }
5132 }
5133 }
5134}
5135
5136fn resolve_assignment_column_name(target: &AssignmentTarget) -> SqlResult<String> {
5137 match target {
5138 AssignmentTarget::ColumnName(name) => {
5139 if name.0.len() != 1 {
5140 return Err(Error::InvalidArgumentError(
5141 "qualified column names in UPDATE assignments are not supported yet".into(),
5142 ));
5143 }
5144 match &name.0[0] {
5145 ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
5146 other => Err(Error::InvalidArgumentError(format!(
5147 "unsupported column reference in UPDATE assignment: {other:?}"
5148 ))),
5149 }
5150 }
5151 AssignmentTarget::Tuple(_) => Err(Error::InvalidArgumentError(
5152 "tuple assignments are not supported yet".into(),
5153 )),
5154 }
5155}
5156
5157fn arrow_type_from_sql(data_type: &SqlDataType) -> SqlResult<arrow::datatypes::DataType> {
5158 use arrow::datatypes::DataType;
5159 match data_type {
5160 SqlDataType::Int(_)
5161 | SqlDataType::Integer(_)
5162 | SqlDataType::BigInt(_)
5163 | SqlDataType::SmallInt(_)
5164 | SqlDataType::TinyInt(_) => Ok(DataType::Int64),
5165 SqlDataType::Float(_)
5166 | SqlDataType::Real
5167 | SqlDataType::Double(_)
5168 | SqlDataType::DoublePrecision => Ok(DataType::Float64),
5169 SqlDataType::Text
5170 | SqlDataType::String(_)
5171 | SqlDataType::Varchar(_)
5172 | SqlDataType::Char(_)
5173 | SqlDataType::Uuid => Ok(DataType::Utf8),
5174 SqlDataType::Date => Ok(DataType::Date32),
5175 SqlDataType::Decimal(_) | SqlDataType::Numeric(_) => Ok(DataType::Float64),
5176 SqlDataType::Boolean => Ok(DataType::Boolean),
5177 SqlDataType::Custom(name, args) => {
5178 if name.0.len() == 1
5179 && let ObjectNamePart::Identifier(ident) = &name.0[0]
5180 && ident.value.eq_ignore_ascii_case("row")
5181 {
5182 return row_type_to_arrow(data_type, args);
5183 }
5184 Err(Error::InvalidArgumentError(format!(
5185 "unsupported SQL data type: {data_type:?}"
5186 )))
5187 }
5188 other => Err(Error::InvalidArgumentError(format!(
5189 "unsupported SQL data type: {other:?}"
5190 ))),
5191 }
5192}
5193
5194fn row_type_to_arrow(
5195 data_type: &SqlDataType,
5196 tokens: &[String],
5197) -> SqlResult<arrow::datatypes::DataType> {
5198 use arrow::datatypes::{DataType, Field, FieldRef, Fields};
5199
5200 let row_str = data_type.to_string();
5201 if tokens.is_empty() {
5202 return Err(Error::InvalidArgumentError(
5203 "ROW type must define at least one field".into(),
5204 ));
5205 }
5206
5207 let dialect = GenericDialect {};
5208 let field_definitions = resolve_row_field_types(tokens, &dialect).map_err(|err| {
5209 Error::InvalidArgumentError(format!("unable to parse ROW type '{row_str}': {err}"))
5210 })?;
5211
5212 let mut fields: Vec<FieldRef> = Vec::with_capacity(field_definitions.len());
5213 for (field_name, field_type) in field_definitions {
5214 let arrow_field_type = arrow_type_from_sql(&field_type)?;
5215 fields.push(Arc::new(Field::new(field_name, arrow_field_type, true)));
5216 }
5217
5218 let struct_fields: Fields = fields.into();
5219 Ok(DataType::Struct(struct_fields))
5220}
5221
5222fn resolve_row_field_types(
5223 tokens: &[String],
5224 dialect: &GenericDialect,
5225) -> SqlResult<Vec<(String, SqlDataType)>> {
5226 if tokens.is_empty() {
5227 return Err(Error::InvalidArgumentError(
5228 "ROW type must define at least one field".into(),
5229 ));
5230 }
5231
5232 let mut start = 0;
5233 let mut end = tokens.len();
5234 if tokens[start] == "(" {
5235 if end == 0 || tokens[end - 1] != ")" {
5236 return Err(Error::InvalidArgumentError(
5237 "ROW type is missing closing ')'".into(),
5238 ));
5239 }
5240 start += 1;
5241 end -= 1;
5242 } else if tokens[end - 1] == ")" {
5243 return Err(Error::InvalidArgumentError(
5244 "ROW type contains unmatched ')'".into(),
5245 ));
5246 }
5247
5248 let slice = &tokens[start..end];
5249 if slice.is_empty() {
5250 return Err(Error::InvalidArgumentError(
5251 "ROW type did not provide any field definitions".into(),
5252 ));
5253 }
5254
5255 let mut fields = Vec::new();
5256 let mut index = 0;
5257
5258 while index < slice.len() {
5259 if slice[index] == "," {
5260 index += 1;
5261 continue;
5262 }
5263
5264 let field_name = normalize_row_field_name(&slice[index])?;
5265 index += 1;
5266
5267 if index >= slice.len() {
5268 return Err(Error::InvalidArgumentError(format!(
5269 "ROW field '{field_name}' is missing a type specification"
5270 )));
5271 }
5272
5273 let mut last_success: Option<(usize, SqlDataType)> = None;
5274 let mut type_end = index;
5275
5276 while type_end <= slice.len() {
5277 let candidate = slice[index..type_end].join(" ");
5278 if candidate.trim().is_empty() {
5279 type_end += 1;
5280 continue;
5281 }
5282
5283 if let Ok(parsed_type) = parse_sql_data_type(&candidate, dialect) {
5284 last_success = Some((type_end, parsed_type));
5285 }
5286
5287 if type_end == slice.len() {
5288 break;
5289 }
5290
5291 if slice[type_end] == "," && last_success.is_some() {
5292 break;
5293 }
5294
5295 type_end += 1;
5296 }
5297
5298 let Some((next_index, data_type)) = last_success else {
5299 return Err(Error::InvalidArgumentError(format!(
5300 "failed to parse ROW field type for '{field_name}'"
5301 )));
5302 };
5303
5304 fields.push((field_name, data_type));
5305 index = next_index;
5306
5307 if index < slice.len() && slice[index] == "," {
5308 index += 1;
5309 }
5310 }
5311
5312 if fields.is_empty() {
5313 return Err(Error::InvalidArgumentError(
5314 "ROW type did not provide any field definitions".into(),
5315 ));
5316 }
5317
5318 Ok(fields)
5319}
5320
5321fn parse_sql_with_recursion_limit(
5335 dialect: &GenericDialect,
5336 sql: &str,
5337) -> Result<Vec<Statement>, sqlparser::parser::ParserError> {
5338 Parser::new(dialect)
5339 .with_recursion_limit(PARSER_RECURSION_LIMIT)
5340 .try_with_sql(sql)?
5341 .parse_statements()
5342}
5343
5344fn normalize_row_field_name(raw: &str) -> SqlResult<String> {
5345 let trimmed = raw.trim();
5346 if trimmed.is_empty() {
5347 return Err(Error::InvalidArgumentError(
5348 "ROW field name must not be empty".into(),
5349 ));
5350 }
5351
5352 if let Some(stripped) = trimmed.strip_prefix('"') {
5353 let without_end = stripped.strip_suffix('"').ok_or_else(|| {
5354 Error::InvalidArgumentError(format!("unterminated quoted ROW field name: {trimmed}"))
5355 })?;
5356 let name = without_end.replace("\"\"", "\"");
5357 return Ok(name);
5358 }
5359
5360 Ok(trimmed.to_string())
5361}
5362
5363fn parse_sql_data_type(type_str: &str, dialect: &GenericDialect) -> SqlResult<SqlDataType> {
5364 let trimmed = type_str.trim();
5365 let sql = format!("CREATE TABLE __row(__field {trimmed});");
5366 let statements = parse_sql_with_recursion_limit(dialect, &sql).map_err(|err| {
5367 Error::InvalidArgumentError(format!("failed to parse ROW field type '{trimmed}': {err}"))
5368 })?;
5369
5370 let stmt = statements.into_iter().next().ok_or_else(|| {
5371 Error::InvalidArgumentError(format!(
5372 "ROW field type '{trimmed}' did not produce a statement"
5373 ))
5374 })?;
5375
5376 match stmt {
5377 Statement::CreateTable(table) => table
5378 .columns
5379 .first()
5380 .map(|col| col.data_type.clone())
5381 .ok_or_else(|| {
5382 Error::InvalidArgumentError(format!(
5383 "ROW field type '{trimmed}' missing column definition"
5384 ))
5385 }),
5386 other => Err(Error::InvalidArgumentError(format!(
5387 "unexpected statement while parsing ROW field type: {other:?}"
5388 ))),
5389 }
5390}
5391
5392type ExtractValuesResult = Option<(Vec<Vec<PlanValue>>, Vec<String>)>;
5395
5396#[allow(clippy::type_complexity)]
5397fn extract_values_from_derived_table(from: &[TableWithJoins]) -> SqlResult<ExtractValuesResult> {
5398 if from.len() != 1 {
5399 return Ok(None);
5400 }
5401
5402 let table_with_joins = &from[0];
5403 if !table_with_joins.joins.is_empty() {
5404 return Ok(None);
5405 }
5406
5407 match &table_with_joins.relation {
5408 TableFactor::Derived {
5409 subquery, alias, ..
5410 } => {
5411 let values = match subquery.body.as_ref() {
5413 SetExpr::Values(v) => v,
5414 _ => return Ok(None),
5415 };
5416
5417 let column_names = if let Some(alias) = alias {
5419 alias
5420 .columns
5421 .iter()
5422 .map(|col_def| col_def.name.value.clone())
5423 .collect::<Vec<_>>()
5424 } else {
5425 if values.rows.is_empty() {
5427 return Err(Error::InvalidArgumentError(
5428 "VALUES expression must have at least one row".into(),
5429 ));
5430 }
5431 let first_row = &values.rows[0];
5432 (0..first_row.len())
5433 .map(|i| format!("column{}", i))
5434 .collect()
5435 };
5436
5437 if values.rows.is_empty() {
5439 return Err(Error::InvalidArgumentError(
5440 "VALUES expression must have at least one row".into(),
5441 ));
5442 }
5443
5444 let mut rows = Vec::with_capacity(values.rows.len());
5445 for row in &values.rows {
5446 if row.len() != column_names.len() {
5447 return Err(Error::InvalidArgumentError(format!(
5448 "VALUES row has {} columns but table alias specifies {} columns",
5449 row.len(),
5450 column_names.len()
5451 )));
5452 }
5453
5454 let mut converted_row = Vec::with_capacity(row.len());
5455 for expr in row {
5456 let value = SqlValue::try_from_expr(expr)?;
5457 converted_row.push(PlanValue::from(value));
5458 }
5459 rows.push(converted_row);
5460 }
5461
5462 Ok(Some((rows, column_names)))
5463 }
5464 _ => Ok(None),
5465 }
5466}
5467
5468fn extract_constant_select_rows(select: &Select) -> SqlResult<Option<Vec<Vec<PlanValue>>>> {
5469 if !select.from.is_empty() {
5470 return Ok(None);
5471 }
5472
5473 if select.selection.is_some()
5474 || select.having.is_some()
5475 || !select.named_window.is_empty()
5476 || select.qualify.is_some()
5477 || select.distinct.is_some()
5478 || select.top.is_some()
5479 || select.into.is_some()
5480 || select.prewhere.is_some()
5481 || !select.lateral_views.is_empty()
5482 || select.value_table_mode.is_some()
5483 || !group_by_is_empty(&select.group_by)
5484 {
5485 return Err(Error::InvalidArgumentError(
5486 "constant SELECT statements do not support advanced clauses".into(),
5487 ));
5488 }
5489
5490 if select.projection.is_empty() {
5491 return Err(Error::InvalidArgumentError(
5492 "constant SELECT requires at least one projection".into(),
5493 ));
5494 }
5495
5496 let mut row: Vec<PlanValue> = Vec::with_capacity(select.projection.len());
5497 for item in &select.projection {
5498 let expr = match item {
5499 SelectItem::UnnamedExpr(expr) => expr,
5500 SelectItem::ExprWithAlias { expr, .. } => expr,
5501 other => {
5502 return Err(Error::InvalidArgumentError(format!(
5503 "unsupported projection in constant SELECT: {other:?}"
5504 )));
5505 }
5506 };
5507
5508 let value = SqlValue::try_from_expr(expr)?;
5509 row.push(PlanValue::from(value));
5510 }
5511
5512 Ok(Some(vec![row]))
5513}
5514
5515fn extract_single_table(from: &[TableWithJoins]) -> SqlResult<(String, String)> {
5516 if from.len() != 1 {
5517 return Err(Error::InvalidArgumentError(
5518 "queries over multiple tables are not supported yet".into(),
5519 ));
5520 }
5521 let item = &from[0];
5522 if !item.joins.is_empty() {
5523 return Err(Error::InvalidArgumentError(
5524 "JOIN clauses are not supported yet".into(),
5525 ));
5526 }
5527 match &item.relation {
5528 TableFactor::Table { name, .. } => canonical_object_name(name),
5529 TableFactor::Derived { alias, .. } => {
5530 let table_name = alias
5533 .as_ref()
5534 .map(|a| a.name.value.clone())
5535 .unwrap_or_else(|| "derived".to_string());
5536 let canonical = table_name.to_ascii_lowercase();
5537 Ok((table_name, canonical))
5538 }
5539 _ => Err(Error::InvalidArgumentError(
5540 "queries require a plain table name or derived table".into(),
5541 )),
5542 }
5543}
5544
5545fn extract_tables(from: &[TableWithJoins]) -> SqlResult<Vec<llkv_plan::TableRef>> {
5547 let mut tables = Vec::new();
5548
5549 for item in from {
5550 push_table_factor(&item.relation, &mut tables)?;
5551
5552 for join in &item.joins {
5553 match &join.join_operator {
5554 JoinOperator::CrossJoin(JoinConstraint::None)
5555 | JoinOperator::Inner(JoinConstraint::None) => {
5556 push_table_factor(&join.relation, &mut tables)?;
5557 }
5558 JoinOperator::CrossJoin(_) => {
5559 return Err(Error::InvalidArgumentError(
5560 "CROSS JOIN with constraints is not supported".into(),
5561 ));
5562 }
5563 _ => {
5564 return Err(Error::InvalidArgumentError(
5565 "only CROSS JOIN without constraints is supported".into(),
5566 ));
5567 }
5568 }
5569 }
5570 }
5571
5572 Ok(tables)
5573}
5574
5575fn push_table_factor(factor: &TableFactor, tables: &mut Vec<llkv_plan::TableRef>) -> SqlResult<()> {
5576 match factor {
5577 TableFactor::Table { name, alias, .. } => {
5578 let (schema_opt, table) = parse_schema_qualified_name(name)?;
5579 let schema = schema_opt.unwrap_or_default();
5580 let alias_name = alias.as_ref().map(|a| a.name.value.clone());
5581 tables.push(llkv_plan::TableRef::with_alias(schema, table, alias_name));
5582 Ok(())
5583 }
5584 TableFactor::Derived { .. } => Err(Error::InvalidArgumentError(
5585 "JOIN clauses require base tables; derived tables are not supported".into(),
5586 )),
5587 _ => Err(Error::InvalidArgumentError(
5588 "queries require a plain table name".into(),
5589 )),
5590 }
5591}
5592
5593fn group_by_is_empty(expr: &GroupByExpr) -> bool {
5594 matches!(
5595 expr,
5596 GroupByExpr::Expressions(exprs, modifiers)
5597 if exprs.is_empty() && modifiers.is_empty()
5598 )
5599}
5600
5601#[cfg(test)]
5602mod tests {
5603 use super::*;
5604 use arrow::array::{Array, Int64Array, StringArray};
5605 use arrow::record_batch::RecordBatch;
5606 use llkv_storage::pager::MemPager;
5607
5608 fn extract_string_options(batches: &[RecordBatch]) -> Vec<Option<String>> {
5609 let mut values: Vec<Option<String>> = Vec::new();
5610 for batch in batches {
5611 let column = batch
5612 .column(0)
5613 .as_any()
5614 .downcast_ref::<StringArray>()
5615 .expect("string column");
5616 for idx in 0..column.len() {
5617 if column.is_null(idx) {
5618 values.push(None);
5619 } else {
5620 values.push(Some(column.value(idx).to_string()));
5621 }
5622 }
5623 }
5624 values
5625 }
5626
5627 #[test]
5628 fn create_insert_select_roundtrip() {
5629 let pager = Arc::new(MemPager::default());
5630 let engine = SqlEngine::new(pager);
5631
5632 let result = engine
5633 .execute("CREATE TABLE people (id INT NOT NULL, name TEXT NOT NULL)")
5634 .expect("create table");
5635 assert!(matches!(
5636 result[0],
5637 RuntimeStatementResult::CreateTable { .. }
5638 ));
5639
5640 let result = engine
5641 .execute("INSERT INTO people (id, name) VALUES (1, 'alice'), (2, 'bob')")
5642 .expect("insert rows");
5643 assert!(matches!(
5644 result[0],
5645 RuntimeStatementResult::Insert {
5646 rows_inserted: 2,
5647 ..
5648 }
5649 ));
5650
5651 let mut result = engine
5652 .execute("SELECT name FROM people WHERE id = 2")
5653 .expect("select rows");
5654 let select_result = result.remove(0);
5655 let batches = match select_result {
5656 RuntimeStatementResult::Select { execution, .. } => {
5657 execution.collect().expect("collect batches")
5658 }
5659 _ => panic!("expected select result"),
5660 };
5661 assert_eq!(batches.len(), 1);
5662 let column = batches[0]
5663 .column(0)
5664 .as_any()
5665 .downcast_ref::<StringArray>()
5666 .expect("string column");
5667 assert_eq!(column.len(), 1);
5668 assert_eq!(column.value(0), "bob");
5669 }
5670
5671 #[test]
5672 fn insert_select_constant_including_null() {
5673 let pager = Arc::new(MemPager::default());
5674 let engine = SqlEngine::new(pager);
5675
5676 engine
5677 .execute("CREATE TABLE integers(i INTEGER)")
5678 .expect("create table");
5679
5680 let result = engine
5681 .execute("INSERT INTO integers SELECT 42")
5682 .expect("insert literal");
5683 assert!(matches!(
5684 result[0],
5685 RuntimeStatementResult::Insert {
5686 rows_inserted: 1,
5687 ..
5688 }
5689 ));
5690
5691 let result = engine
5692 .execute("INSERT INTO integers SELECT CAST(NULL AS VARCHAR)")
5693 .expect("insert null literal");
5694 assert!(matches!(
5695 result[0],
5696 RuntimeStatementResult::Insert {
5697 rows_inserted: 1,
5698 ..
5699 }
5700 ));
5701
5702 let mut result = engine
5703 .execute("SELECT * FROM integers")
5704 .expect("select rows");
5705 let select_result = result.remove(0);
5706 let batches = match select_result {
5707 RuntimeStatementResult::Select { execution, .. } => {
5708 execution.collect().expect("collect batches")
5709 }
5710 _ => panic!("expected select result"),
5711 };
5712
5713 let mut values: Vec<Option<i64>> = Vec::new();
5714 for batch in &batches {
5715 let column = batch
5716 .column(0)
5717 .as_any()
5718 .downcast_ref::<Int64Array>()
5719 .expect("int column");
5720 for idx in 0..column.len() {
5721 if column.is_null(idx) {
5722 values.push(None);
5723 } else {
5724 values.push(Some(column.value(idx)));
5725 }
5726 }
5727 }
5728
5729 assert_eq!(values, vec![Some(42), None]);
5730 }
5731
5732 #[test]
5733 fn not_null_comparison_filters_all_rows() {
5734 let pager = Arc::new(MemPager::default());
5735 let engine = SqlEngine::new(pager);
5736
5737 engine
5738 .execute("CREATE TABLE single(col INTEGER)")
5739 .expect("create table");
5740 engine
5741 .execute("INSERT INTO single VALUES (1)")
5742 .expect("insert row");
5743
5744 let batches = engine
5745 .sql("SELECT * FROM single WHERE NOT ( NULL ) >= NULL")
5746 .expect("run constant null comparison");
5747
5748 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
5749 assert_eq!(total_rows, 0, "expected filter to remove all rows");
5750 }
5751
5752 #[test]
5753 fn not_null_in_list_filters_all_rows() {
5754 let pager = Arc::new(MemPager::default());
5755 let engine = SqlEngine::new(pager);
5756
5757 engine
5758 .execute("CREATE TABLE tab0(col0 INTEGER, col1 INTEGER, col2 INTEGER)")
5759 .expect("create table");
5760 engine
5761 .execute("INSERT INTO tab0 VALUES (1, 2, 3)")
5762 .expect("insert row");
5763
5764 let batches = engine
5765 .sql("SELECT * FROM tab0 WHERE NOT ( NULL ) IN ( - col2 * + col2 )")
5766 .expect("run IN list null comparison");
5767
5768 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
5769 assert_eq!(total_rows, 0, "expected IN list filter to remove all rows");
5770 }
5771
5772 #[test]
5773 fn cross_join_not_null_comparison_filters_all_rows() {
5774 let pager = Arc::new(MemPager::default());
5775 let engine = SqlEngine::new(pager);
5776
5777 engine
5778 .execute("CREATE TABLE left_side(col INTEGER)")
5779 .expect("create left table");
5780 engine
5781 .execute("CREATE TABLE right_side(col INTEGER)")
5782 .expect("create right table");
5783 engine
5784 .execute("INSERT INTO left_side VALUES (1)")
5785 .expect("insert left row");
5786 engine
5787 .execute("INSERT INTO right_side VALUES (2)")
5788 .expect("insert right row");
5789
5790 let batches = engine
5791 .sql("SELECT * FROM left_side CROSS JOIN right_side WHERE NOT ( NULL ) >= NULL")
5792 .expect("run cross join null comparison");
5793
5794 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
5795 assert_eq!(
5796 total_rows, 0,
5797 "expected cross join filter to remove all rows"
5798 );
5799 }
5800
5801 #[test]
5802 fn cross_join_duplicate_table_name_resolves_columns() {
5803 let pager = Arc::new(MemPager::default());
5804 let engine = SqlEngine::new(pager);
5805
5806 use sqlparser::ast::{SetExpr, Statement};
5807 use sqlparser::dialect::SQLiteDialect;
5808 use sqlparser::parser::Parser;
5809
5810 engine
5811 .execute("CREATE TABLE tab1(col0 INTEGER, col1 INTEGER, col2 INTEGER)")
5812 .expect("create tab1");
5813 engine
5814 .execute("INSERT INTO tab1 VALUES (7, 8, 9)")
5815 .expect("insert tab1 row");
5816
5817 let dialect = SQLiteDialect {};
5818 let ast = Parser::parse_sql(
5819 &dialect,
5820 "SELECT tab1.col2 FROM tab1 AS cor0 CROSS JOIN tab1",
5821 )
5822 .expect("parse cross join query");
5823 let Statement::Query(query) = &ast[0] else {
5824 panic!("expected SELECT query");
5825 };
5826 let select = match query.body.as_ref() {
5827 SetExpr::Select(select) => select.as_ref(),
5828 other => panic!("unexpected query body: {other:?}"),
5829 };
5830 assert_eq!(select.from.len(), 1);
5831 assert!(!select.from[0].joins.is_empty());
5832
5833 let batches = engine
5834 .sql("SELECT tab1.col2 FROM tab1 AS cor0 CROSS JOIN tab1")
5835 .expect("run cross join with alias and base table");
5836
5837 let mut values = Vec::new();
5838 for batch in batches {
5839 let column = batch
5840 .column(0)
5841 .as_any()
5842 .downcast_ref::<Int64Array>()
5843 .expect("int column");
5844 for idx in 0..column.len() {
5845 if column.is_null(idx) {
5846 values.push(None);
5847 } else {
5848 values.push(Some(column.value(idx)));
5849 }
5850 }
5851 }
5852
5853 assert_eq!(values, vec![Some(9)]);
5854 }
5855
5856 #[test]
5857 fn update_with_where_clause_filters_rows() {
5858 let pager = Arc::new(MemPager::default());
5859 let engine = SqlEngine::new(pager);
5860
5861 engine
5862 .execute("SET default_null_order='nulls_first'")
5863 .expect("set default null order");
5864
5865 engine
5866 .execute("CREATE TABLE strings(a VARCHAR)")
5867 .expect("create table");
5868
5869 engine
5870 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
5871 .expect("insert seed rows");
5872
5873 let result = engine
5874 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
5875 .expect("update rows");
5876 assert!(matches!(
5877 result[0],
5878 RuntimeStatementResult::Update {
5879 rows_updated: 1,
5880 ..
5881 }
5882 ));
5883
5884 let mut result = engine
5885 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5886 .expect("select rows");
5887 let select_result = result.remove(0);
5888 let batches = match select_result {
5889 RuntimeStatementResult::Select { execution, .. } => {
5890 execution.collect().expect("collect batches")
5891 }
5892 _ => panic!("expected select result"),
5893 };
5894
5895 let mut values: Vec<Option<String>> = Vec::new();
5896 for batch in &batches {
5897 let column = batch
5898 .column(0)
5899 .as_any()
5900 .downcast_ref::<StringArray>()
5901 .expect("string column");
5902 for idx in 0..column.len() {
5903 if column.is_null(idx) {
5904 values.push(None);
5905 } else {
5906 values.push(Some(column.value(idx).to_string()));
5907 }
5908 }
5909 }
5910
5911 values.sort_by(|a, b| match (a, b) {
5912 (None, None) => std::cmp::Ordering::Equal,
5913 (None, Some(_)) => std::cmp::Ordering::Less,
5914 (Some(_), None) => std::cmp::Ordering::Greater,
5915 (Some(av), Some(bv)) => {
5916 let a_val = av.parse::<i64>().unwrap_or_default();
5917 let b_val = bv.parse::<i64>().unwrap_or_default();
5918 a_val.cmp(&b_val)
5919 }
5920 });
5921
5922 assert_eq!(
5923 values,
5924 vec![None, Some("4".to_string()), Some("13".to_string())]
5925 );
5926 }
5927
5928 #[test]
5929 fn order_by_honors_configured_default_null_order() {
5930 let pager = Arc::new(MemPager::default());
5931 let engine = SqlEngine::new(pager);
5932
5933 engine
5934 .execute("CREATE TABLE strings(a VARCHAR)")
5935 .expect("create table");
5936 engine
5937 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
5938 .expect("insert values");
5939 engine
5940 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
5941 .expect("update value");
5942
5943 let mut result = engine
5944 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5945 .expect("select rows");
5946 let select_result = result.remove(0);
5947 let batches = match select_result {
5948 RuntimeStatementResult::Select { execution, .. } => {
5949 execution.collect().expect("collect batches")
5950 }
5951 _ => panic!("expected select result"),
5952 };
5953
5954 let values = extract_string_options(&batches);
5955 assert_eq!(
5956 values,
5957 vec![Some("4".to_string()), Some("13".to_string()), None]
5958 );
5959
5960 assert!(!engine.default_nulls_first_for_tests());
5961
5962 engine
5963 .execute("SET default_null_order='nulls_first'")
5964 .expect("set default null order");
5965
5966 assert!(engine.default_nulls_first_for_tests());
5967
5968 let mut result = engine
5969 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
5970 .expect("select rows");
5971 let select_result = result.remove(0);
5972 let batches = match select_result {
5973 RuntimeStatementResult::Select { execution, .. } => {
5974 execution.collect().expect("collect batches")
5975 }
5976 _ => panic!("expected select result"),
5977 };
5978
5979 let values = extract_string_options(&batches);
5980 assert_eq!(
5981 values,
5982 vec![None, Some("4".to_string()), Some("13".to_string())]
5983 );
5984 }
5985
5986 #[test]
5987 fn arrow_type_from_row_returns_struct_fields() {
5988 let dialect = GenericDialect {};
5989 let statements = parse_sql_with_recursion_limit(
5990 &dialect,
5991 "CREATE TABLE row_types(payload ROW(a INTEGER, b VARCHAR));",
5992 )
5993 .expect("parse ROW type definition");
5994
5995 let data_type = match &statements[0] {
5996 Statement::CreateTable(stmt) => stmt.columns[0].data_type.clone(),
5997 other => panic!("unexpected statement: {other:?}"),
5998 };
5999
6000 let arrow_type = arrow_type_from_sql(&data_type).expect("convert ROW type");
6001 match arrow_type {
6002 arrow::datatypes::DataType::Struct(fields) => {
6003 assert_eq!(fields.len(), 2, "unexpected field count");
6004 assert_eq!(fields[0].name(), "a");
6005 assert_eq!(fields[1].name(), "b");
6006 assert_eq!(fields[0].data_type(), &arrow::datatypes::DataType::Int64);
6007 assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
6008 }
6009 other => panic!("expected struct type, got {other:?}"),
6010 }
6011 }
6012}