1use std::collections::{HashMap, HashSet};
2use std::sync::{
3 Arc, OnceLock,
4 atomic::{AtomicBool, Ordering as AtomicOrdering},
5};
6
7use crate::SqlResult;
8use crate::SqlValue;
9use arrow::record_batch::RecordBatch;
10
11use llkv_executor::SelectExecution;
12use llkv_expr::literal::Literal;
13use llkv_plan::validation::{
14 ensure_known_columns_case_insensitive, ensure_non_empty, ensure_unique_case_insensitive,
15};
16use llkv_result::Error;
17use llkv_runtime::storage_namespace::TEMPORARY_NAMESPACE_ID;
18use llkv_runtime::{
19 AggregateExpr, AssignmentValue, ColumnAssignment, ColumnSpec, CreateIndexPlan, CreateTablePlan,
20 CreateTableSource, DeletePlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertPlan,
21 InsertSource, OrderByPlan, OrderSortType, OrderTarget, PlanStatement, PlanValue,
22 RuntimeContext, RuntimeEngine, RuntimeSession, RuntimeStatementResult, SelectPlan,
23 SelectProjection, UpdatePlan, extract_rows_from_range,
24};
25use llkv_storage::pager::Pager;
26use llkv_table::catalog::{IdentifierContext, IdentifierResolver};
27use regex::Regex;
28use simd_r_drive_entry_handle::EntryHandle;
29use sqlparser::ast::{
30 Assignment, AssignmentTarget, BeginTransactionKind, BinaryOperator, ColumnOption,
31 ColumnOptionDef, ConstraintCharacteristics, DataType as SqlDataType, Delete, ExceptionWhen,
32 Expr as SqlExpr, FromTable, FunctionArg, FunctionArgExpr, FunctionArguments, GroupByExpr,
33 Ident, LimitClause, NullsDistinctOption, ObjectName, ObjectNamePart, ObjectType, OrderBy,
34 OrderByKind, Query, ReferentialAction, SchemaName, Select, SelectItem,
35 SelectItemQualifiedWildcardKind, Set, SetExpr, SqlOption, Statement, TableConstraint,
36 TableFactor, TableObject, TableWithJoins, TransactionMode, TransactionModifier, UnaryOperator,
37 UpdateTableFromKind, Value, ValueWithSpan,
38};
39use sqlparser::dialect::GenericDialect;
40use sqlparser::parser::Parser;
41
42pub struct SqlEngine<P>
43where
44 P: Pager<Blob = EntryHandle> + Send + Sync,
45{
46 engine: RuntimeEngine<P>,
47 default_nulls_first: AtomicBool,
48}
49
50const DROPPED_TABLE_TRANSACTION_ERR: &str = "another transaction has dropped this table";
51
52impl<P> Clone for SqlEngine<P>
53where
54 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
55{
56 fn clone(&self) -> Self {
57 tracing::warn!(
58 "[SQL_ENGINE] SqlEngine::clone() called - will create new Engine with new session!"
59 );
60 Self {
62 engine: self.engine.clone(),
63 default_nulls_first: AtomicBool::new(
64 self.default_nulls_first.load(AtomicOrdering::Relaxed),
65 ),
66 }
67 }
68}
69
70#[allow(dead_code)]
71impl<P> SqlEngine<P>
72where
73 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
74{
75 fn map_table_error(table_name: &str, err: Error) -> Error {
76 match err {
77 Error::NotFound => Self::table_not_found_error(table_name),
78 Error::InvalidArgumentError(msg) if msg.contains("unknown table") => {
79 Self::table_not_found_error(table_name)
80 }
81 other => other,
82 }
83 }
84
85 fn table_not_found_error(table_name: &str) -> Error {
86 Error::CatalogError(format!(
87 "Catalog Error: Table '{table_name}' does not exist"
88 ))
89 }
90
91 fn is_table_missing_error(err: &Error) -> bool {
92 match err {
93 Error::NotFound => true,
94 Error::CatalogError(msg) => {
95 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
96 }
97 Error::InvalidArgumentError(msg) => {
98 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
99 }
100 _ => false,
101 }
102 }
103
104 fn execute_plan_statement(
105 &self,
106 statement: PlanStatement,
107 ) -> SqlResult<RuntimeStatementResult<P>> {
108 let table = llkv_runtime::statement_table_name(&statement).map(str::to_string);
109 self.engine.execute_statement(statement).map_err(|err| {
110 if let Some(table_name) = table {
111 Self::map_table_error(&table_name, err)
112 } else {
113 err
114 }
115 })
116 }
117
118 pub fn new(pager: Arc<P>) -> Self {
119 let engine = RuntimeEngine::new(pager);
120 Self {
121 engine,
122 default_nulls_first: AtomicBool::new(false),
123 }
124 }
125
126 fn preprocess_exclude_syntax(sql: &str) -> String {
129 static EXCLUDE_REGEX: OnceLock<Regex> = OnceLock::new();
130
131 let re = EXCLUDE_REGEX.get_or_init(|| {
134 Regex::new(
135 r"(?i)EXCLUDE\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)+)\s*\)",
136 )
137 .expect("valid EXCLUDE qualifier regex")
138 });
139
140 re.replace_all(sql, |caps: ®ex::Captures| {
141 let qualified_name = &caps[1];
142 format!("EXCLUDE (\"{}\")", qualified_name)
143 })
144 .to_string()
145 }
146
147 pub(crate) fn context_arc(&self) -> Arc<RuntimeContext<P>> {
148 self.engine.context()
149 }
150
151 pub fn with_context(context: Arc<RuntimeContext<P>>, default_nulls_first: bool) -> Self {
152 Self {
153 engine: RuntimeEngine::from_context(context),
154 default_nulls_first: AtomicBool::new(default_nulls_first),
155 }
156 }
157
158 #[cfg(test)]
159 fn default_nulls_first_for_tests(&self) -> bool {
160 self.default_nulls_first.load(AtomicOrdering::Relaxed)
161 }
162
163 fn has_active_transaction(&self) -> bool {
164 self.engine.session().has_active_transaction()
165 }
166
167 pub fn session(&self) -> &RuntimeSession<P> {
169 self.engine.session()
170 }
171
172 pub fn execute(&self, sql: &str) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
173 tracing::trace!("DEBUG SQL execute: {}", sql);
174
175 let processed_sql = Self::preprocess_exclude_syntax(sql);
178
179 let dialect = GenericDialect {};
180 let statements = Parser::parse_sql(&dialect, &processed_sql)
181 .map_err(|err| Error::InvalidArgumentError(format!("failed to parse SQL: {err}")))?;
182 tracing::trace!("DEBUG SQL execute: parsed {} statements", statements.len());
183
184 let mut results = Vec::with_capacity(statements.len());
185 for (i, statement) in statements.iter().enumerate() {
186 tracing::trace!("DEBUG SQL execute: processing statement {}", i);
187 results.push(self.execute_statement(statement.clone())?);
188 tracing::trace!("DEBUG SQL execute: statement {} completed", i);
189 }
190 tracing::trace!("DEBUG SQL execute completed successfully");
191 Ok(results)
192 }
193
194 fn execute_statement(&self, statement: Statement) -> SqlResult<RuntimeStatementResult<P>> {
195 tracing::trace!(
196 "DEBUG SQL execute_statement: {:?}",
197 match &statement {
198 Statement::Insert(insert) =>
199 format!("Insert(table={:?})", Self::table_name_from_insert(insert)),
200 Statement::Query(_) => "Query".to_string(),
201 Statement::StartTransaction { .. } => "StartTransaction".to_string(),
202 Statement::Commit { .. } => "Commit".to_string(),
203 Statement::Rollback { .. } => "Rollback".to_string(),
204 Statement::CreateTable(_) => "CreateTable".to_string(),
205 Statement::Update { .. } => "Update".to_string(),
206 Statement::Delete(_) => "Delete".to_string(),
207 other => format!("Other({:?})", other),
208 }
209 );
210 match statement {
211 Statement::StartTransaction {
212 modes,
213 begin,
214 transaction,
215 modifier,
216 statements,
217 exception,
218 has_end_keyword,
219 } => self.handle_start_transaction(
220 modes,
221 begin,
222 transaction,
223 modifier,
224 statements,
225 exception,
226 has_end_keyword,
227 ),
228 Statement::Commit {
229 chain,
230 end,
231 modifier,
232 } => self.handle_commit(chain, end, modifier),
233 Statement::Rollback { chain, savepoint } => self.handle_rollback(chain, savepoint),
234 other => self.execute_statement_non_transactional(other),
235 }
236 }
237
238 fn execute_statement_non_transactional(
239 &self,
240 statement: Statement,
241 ) -> SqlResult<RuntimeStatementResult<P>> {
242 tracing::trace!("DEBUG SQL execute_statement_non_transactional called");
243 match statement {
244 Statement::CreateTable(stmt) => {
245 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateTable");
246 self.handle_create_table(stmt)
247 }
248 Statement::CreateIndex(stmt) => {
249 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateIndex");
250 self.handle_create_index(stmt)
251 }
252 Statement::CreateSchema {
253 schema_name,
254 if_not_exists,
255 with,
256 options,
257 default_collate_spec,
258 clone,
259 } => {
260 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateSchema");
261 self.handle_create_schema(
262 schema_name,
263 if_not_exists,
264 with,
265 options,
266 default_collate_spec,
267 clone,
268 )
269 }
270 Statement::Insert(stmt) => {
271 let table_name =
272 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
273 tracing::trace!(
274 "DEBUG SQL execute_statement_non_transactional: Insert(table={})",
275 table_name
276 );
277 self.handle_insert(stmt)
278 }
279 Statement::Query(query) => {
280 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Query");
281 self.handle_query(*query)
282 }
283 Statement::Update {
284 table,
285 assignments,
286 from,
287 selection,
288 returning,
289 ..
290 } => {
291 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Update");
292 self.handle_update(table, assignments, from, selection, returning)
293 }
294 Statement::Delete(delete) => {
295 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Delete");
296 self.handle_delete(delete)
297 }
298 Statement::Drop {
299 object_type,
300 if_exists,
301 names,
302 cascade,
303 restrict,
304 purge,
305 temporary,
306 ..
307 } => {
308 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Drop");
309 self.handle_drop(
310 object_type,
311 if_exists,
312 names,
313 cascade,
314 restrict,
315 purge,
316 temporary,
317 )
318 }
319 Statement::Set(set_stmt) => {
320 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Set");
321 self.handle_set(set_stmt)
322 }
323 Statement::Pragma { name, value, is_eq } => {
324 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Pragma");
325 self.handle_pragma(name, value, is_eq)
326 }
327 other => {
328 tracing::trace!(
329 "DEBUG SQL execute_statement_non_transactional: Other({:?})",
330 other
331 );
332 Err(Error::InvalidArgumentError(format!(
333 "unsupported SQL statement: {other:?}"
334 )))
335 }
336 }
337 }
338
339 fn table_name_from_insert(insert: &sqlparser::ast::Insert) -> SqlResult<String> {
340 match &insert.table {
341 TableObject::TableName(name) => Self::object_name_to_string(name),
342 _ => Err(Error::InvalidArgumentError(
343 "INSERT requires a plain table name".into(),
344 )),
345 }
346 }
347
348 fn table_name_from_update(table: &TableWithJoins) -> SqlResult<Option<String>> {
349 if !table.joins.is_empty() {
350 return Err(Error::InvalidArgumentError(
351 "UPDATE with JOIN targets is not supported yet".into(),
352 ));
353 }
354 Self::table_with_joins_name(table)
355 }
356
357 fn table_name_from_delete(delete: &Delete) -> SqlResult<Option<String>> {
358 if !delete.tables.is_empty() {
359 return Err(Error::InvalidArgumentError(
360 "multi-table DELETE is not supported yet".into(),
361 ));
362 }
363 let from_tables = match &delete.from {
364 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
365 };
366 if from_tables.is_empty() {
367 return Ok(None);
368 }
369 if from_tables.len() != 1 {
370 return Err(Error::InvalidArgumentError(
371 "DELETE over multiple tables is not supported yet".into(),
372 ));
373 }
374 Self::table_with_joins_name(&from_tables[0])
375 }
376
377 fn object_name_to_string(name: &ObjectName) -> SqlResult<String> {
378 let (display, _) = canonical_object_name(name)?;
379 Ok(display)
380 }
381
382 #[allow(dead_code)]
383 fn table_object_to_name(table: &TableObject) -> SqlResult<Option<String>> {
384 match table {
385 TableObject::TableName(name) => Ok(Some(Self::object_name_to_string(name)?)),
386 TableObject::TableFunction(_) => Ok(None),
387 }
388 }
389
390 fn table_with_joins_name(table: &TableWithJoins) -> SqlResult<Option<String>> {
391 match &table.relation {
392 TableFactor::Table { name, .. } => Ok(Some(Self::object_name_to_string(name)?)),
393 _ => Ok(None),
394 }
395 }
396
397 fn tables_in_query(query: &Query) -> SqlResult<Vec<String>> {
398 let mut tables = Vec::new();
399 if let sqlparser::ast::SetExpr::Select(select) = query.body.as_ref() {
400 for table in &select.from {
401 if let TableFactor::Table { name, .. } = &table.relation {
402 tables.push(Self::object_name_to_string(name)?);
403 }
404 }
405 }
406 Ok(tables)
407 }
408
409 fn collect_known_columns(
410 &self,
411 display_name: &str,
412 canonical_name: &str,
413 ) -> SqlResult<HashSet<String>> {
414 let context = self.engine.context();
415
416 if context.is_table_marked_dropped(canonical_name) {
417 return Err(Self::table_not_found_error(display_name));
418 }
419
420 match context.table_column_specs(display_name) {
421 Ok(specs) => Ok(specs
422 .into_iter()
423 .map(|spec| spec.name.to_ascii_lowercase())
424 .collect()),
425 Err(err) => {
426 if !Self::is_table_missing_error(&err) {
427 return Err(Self::map_table_error(display_name, err));
428 }
429
430 Ok(HashSet::new())
431 }
432 }
433 }
434
435 fn is_table_marked_dropped(&self, table_name: &str) -> SqlResult<bool> {
436 let canonical = table_name.to_ascii_lowercase();
437 Ok(self.engine.context().is_table_marked_dropped(&canonical))
438 }
439
440 fn handle_create_table(
441 &self,
442 mut stmt: sqlparser::ast::CreateTable,
443 ) -> SqlResult<RuntimeStatementResult<P>> {
444 validate_create_table_common(&stmt)?;
445
446 let (mut schema_name, table_name) = parse_schema_qualified_name(&stmt.name)?;
447
448 let namespace = if stmt.temporary {
449 if schema_name.is_some() {
450 return Err(Error::InvalidArgumentError(
451 "temporary tables cannot specify an explicit schema".into(),
452 ));
453 }
454 schema_name = None;
455 Some(TEMPORARY_NAMESPACE_ID.to_string())
456 } else {
457 None
458 };
459
460 if let Some(ref schema) = schema_name {
462 let catalog = self.engine.context().table_catalog();
463 if !catalog.schema_exists(schema) {
464 return Err(Error::CatalogError(format!(
465 "Schema '{}' does not exist",
466 schema
467 )));
468 }
469 }
470
471 let display_name = match &schema_name {
473 Some(schema) => format!("{}.{}", schema, table_name),
474 None => table_name.clone(),
475 };
476 let canonical_name = display_name.to_ascii_lowercase();
477 tracing::trace!(
478 "\n=== HANDLE_CREATE_TABLE: table='{}' columns={} ===",
479 display_name,
480 stmt.columns.len()
481 );
482 if display_name.is_empty() {
483 return Err(Error::InvalidArgumentError(
484 "table name must not be empty".into(),
485 ));
486 }
487
488 if let Some(query) = stmt.query.take() {
489 validate_create_table_as(&stmt)?;
490 if let Some(result) = self.try_handle_range_ctas(
491 &display_name,
492 &canonical_name,
493 &query,
494 stmt.if_not_exists,
495 stmt.or_replace,
496 namespace.clone(),
497 )? {
498 return Ok(result);
499 }
500 return self.handle_create_table_as(
501 display_name,
502 canonical_name,
503 *query,
504 stmt.if_not_exists,
505 stmt.or_replace,
506 namespace.clone(),
507 );
508 }
509
510 if stmt.columns.is_empty() {
511 return Err(Error::InvalidArgumentError(
512 "CREATE TABLE requires at least one column".into(),
513 ));
514 }
515
516 validate_create_table_definition(&stmt)?;
517
518 let column_defs_ast = std::mem::take(&mut stmt.columns);
519 let constraints = std::mem::take(&mut stmt.constraints);
520
521 let column_names: Vec<String> = column_defs_ast
522 .iter()
523 .map(|column_def| column_def.name.value.clone())
524 .collect();
525 ensure_unique_case_insensitive(column_names.iter().map(|name| name.as_str()), |dup| {
526 format!(
527 "duplicate column name '{}' in table '{}'",
528 dup, display_name
529 )
530 })?;
531 let column_names_lower: HashSet<String> = column_names
532 .iter()
533 .map(|name| name.to_ascii_lowercase())
534 .collect();
535
536 let mut columns: Vec<ColumnSpec> = Vec::with_capacity(column_defs_ast.len());
537 let mut primary_key_columns: HashSet<String> = HashSet::new();
538 let mut foreign_keys: Vec<ForeignKeySpec> = Vec::new();
539
540 for column_def in column_defs_ast {
542 let is_nullable = column_def
543 .options
544 .iter()
545 .all(|opt| !matches!(opt.option, ColumnOption::NotNull));
546
547 let is_primary_key = column_def.options.iter().any(|opt| {
548 matches!(
549 opt.option,
550 ColumnOption::Unique {
551 is_primary: true,
552 characteristics: _
553 }
554 )
555 });
556
557 let has_unique_constraint = column_def
558 .options
559 .iter()
560 .any(|opt| matches!(opt.option, ColumnOption::Unique { .. }));
561
562 let check_expr = column_def.options.iter().find_map(|opt| {
564 if let ColumnOption::Check(expr) = &opt.option {
565 Some(expr)
566 } else {
567 None
568 }
569 });
570
571 if let Some(check_expr) = check_expr {
573 let all_col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
574 validate_check_constraint(check_expr, &display_name, &all_col_refs)?;
575 }
576
577 let check_expr_str = check_expr.map(|e| e.to_string());
578
579 for opt in &column_def.options {
581 if let ColumnOption::ForeignKey {
582 foreign_table,
583 referred_columns,
584 on_delete,
585 on_update,
586 characteristics,
587 } = &opt.option
588 {
589 let spec = self.build_foreign_key_spec(
590 &display_name,
591 &canonical_name,
592 vec![column_def.name.value.clone()],
593 foreign_table,
594 referred_columns,
595 *on_delete,
596 *on_update,
597 characteristics,
598 &column_names_lower,
599 None,
600 )?;
601 foreign_keys.push(spec);
602 }
603 }
604
605 tracing::trace!(
606 "DEBUG CREATE TABLE column '{}' is_primary_key={} has_unique={} check_expr={:?}",
607 column_def.name.value,
608 is_primary_key,
609 has_unique_constraint,
610 check_expr_str
611 );
612
613 let mut column = ColumnSpec::new(
614 column_def.name.value.clone(),
615 arrow_type_from_sql(&column_def.data_type)?,
616 is_nullable,
617 );
618 tracing::trace!(
619 "DEBUG ColumnSpec after new(): primary_key={} unique={}",
620 column.primary_key,
621 column.unique
622 );
623
624 column = column
625 .with_primary_key(is_primary_key)
626 .with_unique(has_unique_constraint)
627 .with_check(check_expr_str);
628
629 if is_primary_key {
630 column.nullable = false;
631 primary_key_columns.insert(column.name.to_ascii_lowercase());
632 }
633 tracing::trace!(
634 "DEBUG ColumnSpec after with_primary_key({})/with_unique({}): primary_key={} unique={} check_expr={:?}",
635 is_primary_key,
636 has_unique_constraint,
637 column.primary_key,
638 column.unique,
639 column.check_expr
640 );
641
642 columns.push(column);
643 }
644
645 if !constraints.is_empty() {
647 let mut column_lookup: HashMap<String, usize> = HashMap::with_capacity(columns.len());
648 for (idx, column) in columns.iter().enumerate() {
649 column_lookup.insert(column.name.to_ascii_lowercase(), idx);
650 }
651
652 for constraint in constraints {
653 match constraint {
654 TableConstraint::PrimaryKey {
655 columns: constraint_columns,
656 ..
657 } => {
658 if !primary_key_columns.is_empty() {
659 return Err(Error::InvalidArgumentError(
660 "multiple PRIMARY KEY constraints are not supported".into(),
661 ));
662 }
663
664 ensure_non_empty(&constraint_columns, || {
665 "PRIMARY KEY requires at least one column".into()
666 })?;
667
668 let mut pk_column_names: Vec<String> =
669 Vec::with_capacity(constraint_columns.len());
670
671 for index_col in &constraint_columns {
672 let column_ident = extract_index_column_name(
673 index_col,
674 "PRIMARY KEY",
675 false, false, )?;
678 pk_column_names.push(column_ident);
679 }
680
681 ensure_unique_case_insensitive(
682 pk_column_names.iter().map(|name| name.as_str()),
683 |dup| format!("duplicate column '{}' in PRIMARY KEY constraint", dup),
684 )?;
685
686 ensure_known_columns_case_insensitive(
687 pk_column_names.iter().map(|name| name.as_str()),
688 &column_names_lower,
689 |unknown| {
690 format!("unknown column '{}' in PRIMARY KEY constraint", unknown)
691 },
692 )?;
693
694 for column_ident in pk_column_names {
695 let normalized = column_ident.to_ascii_lowercase();
696 let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
697 Error::InvalidArgumentError(format!(
698 "unknown column '{}' in PRIMARY KEY constraint",
699 column_ident
700 ))
701 })?;
702
703 let column = columns.get_mut(idx).expect("column index valid");
704 column.primary_key = true;
705 column.unique = true;
706 column.nullable = false;
707
708 primary_key_columns.insert(normalized);
709 }
710 }
711 TableConstraint::Unique {
712 columns: constraint_columns,
713 index_type,
714 index_options,
715 characteristics,
716 nulls_distinct,
717 ..
718 } => {
719 if !matches!(nulls_distinct, NullsDistinctOption::None) {
720 return Err(Error::InvalidArgumentError(
721 "UNIQUE constraints with NULLS DISTINCT/NOT DISTINCT are not supported yet".into(),
722 ));
723 }
724
725 if index_type.is_some() {
726 return Err(Error::InvalidArgumentError(
727 "UNIQUE constraints with index types are not supported yet".into(),
728 ));
729 }
730
731 if !index_options.is_empty() {
732 return Err(Error::InvalidArgumentError(
733 "UNIQUE constraints with index options are not supported yet"
734 .into(),
735 ));
736 }
737
738 if characteristics.is_some() {
739 return Err(Error::InvalidArgumentError(
740 "UNIQUE constraint characteristics are not supported yet".into(),
741 ));
742 }
743
744 ensure_non_empty(&constraint_columns, || {
745 "UNIQUE constraint requires at least one column".into()
746 })?;
747
748 let mut unique_column_names: Vec<String> =
749 Vec::with_capacity(constraint_columns.len());
750
751 for index_column in &constraint_columns {
752 let column_ident = extract_index_column_name(
753 index_column,
754 "UNIQUE constraint",
755 false, false, )?;
758 unique_column_names.push(column_ident);
759 }
760
761 if unique_column_names.len() > 1 {
762 return Err(Error::InvalidArgumentError(
763 "multi-column UNIQUE constraints are not supported yet".into(),
764 ));
765 }
766
767 ensure_unique_case_insensitive(
768 unique_column_names.iter().map(|name| name.as_str()),
769 |dup| format!("duplicate column '{}' in UNIQUE constraint", dup),
770 )?;
771
772 ensure_known_columns_case_insensitive(
773 unique_column_names.iter().map(|name| name.as_str()),
774 &column_names_lower,
775 |unknown| format!("unknown column '{}' in UNIQUE constraint", unknown),
776 )?;
777
778 let column_ident = unique_column_names
779 .into_iter()
780 .next()
781 .expect("unique constraint checked for emptiness");
782 let normalized = column_ident.to_ascii_lowercase();
783 let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
784 Error::InvalidArgumentError(format!(
785 "unknown column '{}' in UNIQUE constraint",
786 column_ident
787 ))
788 })?;
789
790 let column = columns
791 .get_mut(idx)
792 .expect("column index from lookup must be valid");
793 column.unique = true;
794 }
795 TableConstraint::ForeignKey {
796 name,
797 index_name,
798 columns: fk_columns,
799 foreign_table,
800 referred_columns,
801 on_delete,
802 on_update,
803 characteristics,
804 ..
805 } => {
806 if index_name.is_some() {
807 return Err(Error::InvalidArgumentError(
808 "FOREIGN KEY index clauses are not supported yet".into(),
809 ));
810 }
811
812 let referencing_columns: Vec<String> =
813 fk_columns.into_iter().map(|ident| ident.value).collect();
814 let spec = self.build_foreign_key_spec(
815 &display_name,
816 &canonical_name,
817 referencing_columns,
818 &foreign_table,
819 &referred_columns,
820 on_delete,
821 on_update,
822 &characteristics,
823 &column_names_lower,
824 name.map(|ident| ident.value),
825 )?;
826
827 foreign_keys.push(spec);
828 }
829 unsupported => {
830 return Err(Error::InvalidArgumentError(format!(
831 "table-level constraint {:?} is not supported",
832 unsupported
833 )));
834 }
835 }
836 }
837 }
838
839 let plan = CreateTablePlan {
840 name: display_name,
841 if_not_exists: stmt.if_not_exists,
842 or_replace: stmt.or_replace,
843 columns,
844 source: None,
845 namespace,
846 foreign_keys,
847 };
848 self.execute_plan_statement(PlanStatement::CreateTable(plan))
849 }
850
851 fn handle_create_index(
852 &self,
853 stmt: sqlparser::ast::CreateIndex,
854 ) -> SqlResult<RuntimeStatementResult<P>> {
855 let sqlparser::ast::CreateIndex {
856 name,
857 table_name,
858 using,
859 columns,
860 unique,
861 concurrently,
862 if_not_exists,
863 include,
864 nulls_distinct,
865 with,
866 predicate,
867 index_options,
868 alter_options,
869 ..
870 } = stmt;
871
872 if concurrently {
873 return Err(Error::InvalidArgumentError(
874 "CREATE INDEX CONCURRENTLY is not supported".into(),
875 ));
876 }
877 if using.is_some() {
878 return Err(Error::InvalidArgumentError(
879 "CREATE INDEX USING clauses are not supported".into(),
880 ));
881 }
882 if !include.is_empty() {
883 return Err(Error::InvalidArgumentError(
884 "CREATE INDEX INCLUDE columns are not supported".into(),
885 ));
886 }
887 if nulls_distinct.is_some() {
888 return Err(Error::InvalidArgumentError(
889 "CREATE INDEX NULLS DISTINCT is not supported".into(),
890 ));
891 }
892 if !with.is_empty() {
893 return Err(Error::InvalidArgumentError(
894 "CREATE INDEX WITH options are not supported".into(),
895 ));
896 }
897 if predicate.is_some() {
898 return Err(Error::InvalidArgumentError(
899 "partial CREATE INDEX is not supported".into(),
900 ));
901 }
902 if !index_options.is_empty() {
903 return Err(Error::InvalidArgumentError(
904 "CREATE INDEX options are not supported".into(),
905 ));
906 }
907 if !alter_options.is_empty() {
908 return Err(Error::InvalidArgumentError(
909 "CREATE INDEX ALTER options are not supported".into(),
910 ));
911 }
912 if columns.is_empty() {
913 return Err(Error::InvalidArgumentError(
914 "CREATE INDEX requires at least one column".into(),
915 ));
916 }
917
918 let (schema_name, base_table_name) = parse_schema_qualified_name(&table_name)?;
919 if let Some(ref schema) = schema_name {
920 let catalog = self.engine.context().table_catalog();
921 if !catalog.schema_exists(schema) {
922 return Err(Error::CatalogError(format!(
923 "Schema '{}' does not exist",
924 schema
925 )));
926 }
927 }
928
929 let display_table_name = schema_name
930 .as_ref()
931 .map(|schema| format!("{}.{}", schema, base_table_name))
932 .unwrap_or_else(|| base_table_name.clone());
933 let canonical_table_name = display_table_name.to_ascii_lowercase();
934
935 let known_columns =
936 self.collect_known_columns(&display_table_name, &canonical_table_name)?;
937 let enforce_known_columns = !known_columns.is_empty();
938
939 let index_name = match name {
940 Some(name_obj) => Some(Self::object_name_to_string(&name_obj)?),
941 None => None,
942 };
943
944 let mut index_columns: Vec<IndexColumnPlan> = Vec::with_capacity(columns.len());
945 let mut seen_column_names: HashSet<String> = HashSet::new();
946 for item in columns {
947 if item.column.with_fill.is_some() {
949 return Err(Error::InvalidArgumentError(
950 "CREATE INDEX column WITH FILL is not supported".into(),
951 ));
952 }
953
954 let column_name = extract_index_column_name(
955 &item,
956 "CREATE INDEX",
957 true, true, )?;
960
961 let order_expr = &item.column;
963 let ascending = order_expr.options.asc.unwrap_or(true);
964 let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
965
966 let normalized = column_name.to_ascii_lowercase();
967 if !seen_column_names.insert(normalized.clone()) {
968 return Err(Error::InvalidArgumentError(format!(
969 "duplicate column '{}' in CREATE INDEX",
970 column_name
971 )));
972 }
973
974 if enforce_known_columns && !known_columns.contains(&normalized) {
975 return Err(Error::InvalidArgumentError(format!(
976 "column '{}' does not exist in table '{}'",
977 column_name, display_table_name
978 )));
979 }
980
981 let column_plan = IndexColumnPlan::new(column_name).with_sort(ascending, nulls_first);
982 index_columns.push(column_plan);
983 }
984
985 if index_columns.len() > 1 && !unique {
986 return Err(Error::InvalidArgumentError(
987 "multi-column CREATE INDEX currently supports UNIQUE indexes only".into(),
988 ));
989 }
990
991 let plan = CreateIndexPlan::new(display_table_name)
992 .with_name(index_name)
993 .with_unique(unique)
994 .with_if_not_exists(if_not_exists)
995 .with_columns(index_columns);
996
997 self.execute_plan_statement(PlanStatement::CreateIndex(plan))
998 }
999
1000 fn map_referential_action(
1001 action: Option<ReferentialAction>,
1002 kind: &str,
1003 ) -> SqlResult<ForeignKeyAction> {
1004 match action {
1005 None | Some(ReferentialAction::NoAction) => Ok(ForeignKeyAction::NoAction),
1006 Some(ReferentialAction::Restrict) => Ok(ForeignKeyAction::Restrict),
1007 Some(other) => Err(Error::InvalidArgumentError(format!(
1008 "FOREIGN KEY ON {kind} {:?} is not supported yet",
1009 other
1010 ))),
1011 }
1012 }
1013
1014 #[allow(clippy::too_many_arguments)]
1015 fn build_foreign_key_spec(
1016 &self,
1017 _referencing_display: &str,
1018 referencing_canonical: &str,
1019 referencing_columns: Vec<String>,
1020 foreign_table: &ObjectName,
1021 referenced_columns: &[Ident],
1022 on_delete: Option<ReferentialAction>,
1023 on_update: Option<ReferentialAction>,
1024 characteristics: &Option<ConstraintCharacteristics>,
1025 known_columns_lower: &HashSet<String>,
1026 name: Option<String>,
1027 ) -> SqlResult<ForeignKeySpec> {
1028 if characteristics.is_some() {
1029 return Err(Error::InvalidArgumentError(
1030 "FOREIGN KEY constraint characteristics are not supported yet".into(),
1031 ));
1032 }
1033
1034 ensure_non_empty(&referencing_columns, || {
1035 "FOREIGN KEY constraint requires at least one referencing column".into()
1036 })?;
1037 ensure_unique_case_insensitive(
1038 referencing_columns.iter().map(|name| name.as_str()),
1039 |dup| format!("duplicate column '{}' in FOREIGN KEY constraint", dup),
1040 )?;
1041 ensure_known_columns_case_insensitive(
1042 referencing_columns.iter().map(|name| name.as_str()),
1043 known_columns_lower,
1044 |unknown| format!("unknown column '{}' in FOREIGN KEY constraint", unknown),
1045 )?;
1046
1047 let referenced_columns_vec: Vec<String> = referenced_columns
1048 .iter()
1049 .map(|ident| ident.value.clone())
1050 .collect();
1051 ensure_unique_case_insensitive(
1052 referenced_columns_vec.iter().map(|name| name.as_str()),
1053 |dup| {
1054 format!(
1055 "duplicate referenced column '{}' in FOREIGN KEY constraint",
1056 dup
1057 )
1058 },
1059 )?;
1060
1061 if !referenced_columns_vec.is_empty()
1062 && referenced_columns_vec.len() != referencing_columns.len()
1063 {
1064 return Err(Error::InvalidArgumentError(
1065 "FOREIGN KEY referencing and referenced column counts must match".into(),
1066 ));
1067 }
1068
1069 let (referenced_display, referenced_canonical) = canonical_object_name(foreign_table)?;
1070
1071 if referenced_canonical == referencing_canonical {
1072 ensure_known_columns_case_insensitive(
1073 referenced_columns_vec.iter().map(|name| name.as_str()),
1074 known_columns_lower,
1075 |unknown| {
1076 format!(
1077 "unknown referenced column '{}' in FOREIGN KEY constraint",
1078 unknown
1079 )
1080 },
1081 )?;
1082 } else {
1083 let known_columns =
1084 self.collect_known_columns(&referenced_display, &referenced_canonical)?;
1085 if !known_columns.is_empty() {
1086 ensure_known_columns_case_insensitive(
1087 referenced_columns_vec.iter().map(|name| name.as_str()),
1088 &known_columns,
1089 |unknown| {
1090 format!(
1091 "unknown referenced column '{}' in FOREIGN KEY constraint",
1092 unknown
1093 )
1094 },
1095 )?;
1096 }
1097 }
1098
1099 let on_delete_action = Self::map_referential_action(on_delete, "DELETE")?;
1100 let on_update_action = Self::map_referential_action(on_update, "UPDATE")?;
1101
1102 Ok(ForeignKeySpec {
1103 name,
1104 columns: referencing_columns,
1105 referenced_table: referenced_display,
1106 referenced_columns: referenced_columns_vec,
1107 on_delete: on_delete_action,
1108 on_update: on_update_action,
1109 })
1110 }
1111
1112 fn handle_create_schema(
1113 &self,
1114 schema_name: SchemaName,
1115 _if_not_exists: bool,
1116 with: Option<Vec<SqlOption>>,
1117 options: Option<Vec<SqlOption>>,
1118 default_collate_spec: Option<SqlExpr>,
1119 clone: Option<ObjectName>,
1120 ) -> SqlResult<RuntimeStatementResult<P>> {
1121 if clone.is_some() {
1122 return Err(Error::InvalidArgumentError(
1123 "CREATE SCHEMA ... CLONE is not supported".into(),
1124 ));
1125 }
1126 if with.as_ref().is_some_and(|opts| !opts.is_empty()) {
1127 return Err(Error::InvalidArgumentError(
1128 "CREATE SCHEMA ... WITH options are not supported".into(),
1129 ));
1130 }
1131 if options.as_ref().is_some_and(|opts| !opts.is_empty()) {
1132 return Err(Error::InvalidArgumentError(
1133 "CREATE SCHEMA options are not supported".into(),
1134 ));
1135 }
1136 if default_collate_spec.is_some() {
1137 return Err(Error::InvalidArgumentError(
1138 "CREATE SCHEMA DEFAULT COLLATE is not supported".into(),
1139 ));
1140 }
1141
1142 let schema_name = match schema_name {
1143 SchemaName::Simple(name) => name,
1144 _ => {
1145 return Err(Error::InvalidArgumentError(
1146 "CREATE SCHEMA authorization is not supported".into(),
1147 ));
1148 }
1149 };
1150
1151 let (display_name, canonical) = canonical_object_name(&schema_name)?;
1152 if display_name.is_empty() {
1153 return Err(Error::InvalidArgumentError(
1154 "schema name must not be empty".into(),
1155 ));
1156 }
1157
1158 let catalog = self.engine.context().table_catalog();
1160
1161 if _if_not_exists && catalog.schema_exists(&canonical) {
1162 return Ok(RuntimeStatementResult::NoOp);
1163 }
1164
1165 catalog.register_schema(&canonical).map_err(|err| {
1166 Error::CatalogError(format!(
1167 "Failed to create schema '{}': {}",
1168 display_name, err
1169 ))
1170 })?;
1171
1172 Ok(RuntimeStatementResult::NoOp)
1173 }
1174
1175 fn try_handle_range_ctas(
1176 &self,
1177 display_name: &str,
1178 _canonical_name: &str,
1179 query: &Query,
1180 if_not_exists: bool,
1181 or_replace: bool,
1182 namespace: Option<String>,
1183 ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1184 let select = match query.body.as_ref() {
1185 SetExpr::Select(select) => select,
1186 _ => return Ok(None),
1187 };
1188 if select.from.len() != 1 {
1189 return Ok(None);
1190 }
1191 let table_with_joins = &select.from[0];
1192 if !table_with_joins.joins.is_empty() {
1193 return Ok(None);
1194 }
1195 let (range_size, range_alias) = match &table_with_joins.relation {
1196 TableFactor::Table {
1197 name,
1198 args: Some(args),
1199 alias,
1200 ..
1201 } => {
1202 let func_name = name.to_string().to_ascii_lowercase();
1203 if func_name != "range" {
1204 return Ok(None);
1205 }
1206 if args.args.len() != 1 {
1207 return Err(Error::InvalidArgumentError(
1208 "range table function expects a single argument".into(),
1209 ));
1210 }
1211 let size_expr = &args.args[0];
1212 let range_size = match size_expr {
1213 FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1214 match &value.value {
1215 Value::Number(raw, _) => raw.parse::<i64>().map_err(|e| {
1216 Error::InvalidArgumentError(format!(
1217 "invalid range size literal {}: {}",
1218 raw, e
1219 ))
1220 })?,
1221 other => {
1222 return Err(Error::InvalidArgumentError(format!(
1223 "unsupported range size value: {:?}",
1224 other
1225 )));
1226 }
1227 }
1228 }
1229 _ => {
1230 return Err(Error::InvalidArgumentError(
1231 "unsupported range argument".into(),
1232 ));
1233 }
1234 };
1235 (range_size, alias.as_ref().map(|a| a.name.value.clone()))
1236 }
1237 _ => return Ok(None),
1238 };
1239
1240 if range_size < 0 {
1241 return Err(Error::InvalidArgumentError(
1242 "range size must be non-negative".into(),
1243 ));
1244 }
1245
1246 if select.projection.is_empty() {
1247 return Err(Error::InvalidArgumentError(
1248 "CREATE TABLE AS SELECT requires at least one projected column".into(),
1249 ));
1250 }
1251
1252 let mut column_specs = Vec::with_capacity(select.projection.len());
1253 let mut column_names = Vec::with_capacity(select.projection.len());
1254 let mut row_template = Vec::with_capacity(select.projection.len());
1255 for item in &select.projection {
1256 match item {
1257 SelectItem::ExprWithAlias { expr, alias } => {
1258 let (value, data_type) = match expr {
1259 SqlExpr::Value(value_with_span) => match &value_with_span.value {
1260 Value::Number(raw, _) => {
1261 let parsed = raw.parse::<i64>().map_err(|e| {
1262 Error::InvalidArgumentError(format!(
1263 "invalid numeric literal {}: {}",
1264 raw, e
1265 ))
1266 })?;
1267 (
1268 PlanValue::Integer(parsed),
1269 arrow::datatypes::DataType::Int64,
1270 )
1271 }
1272 Value::SingleQuotedString(s) => (
1273 PlanValue::String(s.clone()),
1274 arrow::datatypes::DataType::Utf8,
1275 ),
1276 other => {
1277 return Err(Error::InvalidArgumentError(format!(
1278 "unsupported SELECT expression in range CTAS: {:?}",
1279 other
1280 )));
1281 }
1282 },
1283 SqlExpr::Identifier(ident) => {
1284 let ident_lower = ident.value.to_ascii_lowercase();
1285 if range_alias
1286 .as_ref()
1287 .map(|a| a.eq_ignore_ascii_case(&ident_lower))
1288 .unwrap_or(false)
1289 || ident_lower == "range"
1290 {
1291 return Err(Error::InvalidArgumentError(
1292 "range() table function columns are not supported yet".into(),
1293 ));
1294 }
1295 return Err(Error::InvalidArgumentError(format!(
1296 "unsupported identifier '{}' in range CTAS projection",
1297 ident.value
1298 )));
1299 }
1300 other => {
1301 return Err(Error::InvalidArgumentError(format!(
1302 "unsupported SELECT expression in range CTAS: {:?}",
1303 other
1304 )));
1305 }
1306 };
1307 let column_name = alias.value.clone();
1308 column_specs.push(ColumnSpec::new(column_name.clone(), data_type, true));
1309 column_names.push(column_name);
1310 row_template.push(value);
1311 }
1312 other => {
1313 return Err(Error::InvalidArgumentError(format!(
1314 "unsupported projection {:?} in range CTAS",
1315 other
1316 )));
1317 }
1318 }
1319 }
1320
1321 let plan = CreateTablePlan {
1322 name: display_name.to_string(),
1323 if_not_exists,
1324 or_replace,
1325 columns: column_specs,
1326 source: None,
1327 namespace,
1328 foreign_keys: Vec::new(),
1329 };
1330 let create_result = self.execute_plan_statement(PlanStatement::CreateTable(plan))?;
1331
1332 let row_count = range_size
1333 .try_into()
1334 .map_err(|_| Error::InvalidArgumentError("range size exceeds usize".into()))?;
1335 if row_count > 0 {
1336 let rows = vec![row_template; row_count];
1337 let insert_plan = InsertPlan {
1338 table: display_name.to_string(),
1339 columns: column_names,
1340 source: InsertSource::Rows(rows),
1341 };
1342 self.execute_plan_statement(PlanStatement::Insert(insert_plan))?;
1343 }
1344
1345 Ok(Some(create_result))
1346 }
1347
1348 fn try_handle_pragma_table_info(
1351 &self,
1352 query: &Query,
1353 ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
1354 let select = match query.body.as_ref() {
1355 SetExpr::Select(select) => select,
1356 _ => return Ok(None),
1357 };
1358
1359 if select.from.len() != 1 {
1360 return Ok(None);
1361 }
1362
1363 let table_with_joins = &select.from[0];
1364 if !table_with_joins.joins.is_empty() {
1365 return Ok(None);
1366 }
1367
1368 let table_name = match &table_with_joins.relation {
1370 TableFactor::Table {
1371 name,
1372 args: Some(args),
1373 ..
1374 } => {
1375 let func_name = name.to_string().to_ascii_lowercase();
1376 if func_name != "pragma_table_info" {
1377 return Ok(None);
1378 }
1379
1380 if args.args.len() != 1 {
1382 return Err(Error::InvalidArgumentError(
1383 "pragma_table_info expects exactly one argument".into(),
1384 ));
1385 }
1386
1387 match &args.args[0] {
1388 FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
1389 match &value.value {
1390 Value::SingleQuotedString(s) => s.clone(),
1391 Value::DoubleQuotedString(s) => s.clone(),
1392 _ => {
1393 return Err(Error::InvalidArgumentError(
1394 "pragma_table_info argument must be a string".into(),
1395 ));
1396 }
1397 }
1398 }
1399 _ => {
1400 return Err(Error::InvalidArgumentError(
1401 "pragma_table_info argument must be a string literal".into(),
1402 ));
1403 }
1404 }
1405 }
1406 _ => return Ok(None),
1407 };
1408
1409 let context = self.engine.context();
1411 let columns = context.table_column_specs(&table_name)?;
1412
1413 use arrow::array::{BooleanArray, Int32Array, StringArray};
1415 use arrow::datatypes::{DataType, Field, Schema};
1416
1417 let mut cid_values = Vec::new();
1418 let mut name_values = Vec::new();
1419 let mut type_values = Vec::new();
1420 let mut notnull_values = Vec::new();
1421 let mut dflt_value_values: Vec<Option<String>> = Vec::new();
1422 let mut pk_values = Vec::new();
1423
1424 for (idx, col) in columns.iter().enumerate() {
1425 cid_values.push(idx as i32);
1426 name_values.push(col.name.clone());
1427 type_values.push(format!("{:?}", col.data_type)); notnull_values.push(!col.nullable);
1429 dflt_value_values.push(None); pk_values.push(col.primary_key);
1431 }
1432
1433 let schema = Arc::new(Schema::new(vec![
1434 Field::new("cid", DataType::Int32, false),
1435 Field::new("name", DataType::Utf8, false),
1436 Field::new("type", DataType::Utf8, false),
1437 Field::new("notnull", DataType::Boolean, false),
1438 Field::new("dflt_value", DataType::Utf8, true),
1439 Field::new("pk", DataType::Boolean, false),
1440 ]));
1441
1442 use arrow::array::ArrayRef;
1443 let mut batch = RecordBatch::try_new(
1444 Arc::clone(&schema),
1445 vec![
1446 Arc::new(Int32Array::from(cid_values)) as ArrayRef,
1447 Arc::new(StringArray::from(name_values)) as ArrayRef,
1448 Arc::new(StringArray::from(type_values)) as ArrayRef,
1449 Arc::new(BooleanArray::from(notnull_values)) as ArrayRef,
1450 Arc::new(StringArray::from(dflt_value_values)) as ArrayRef,
1451 Arc::new(BooleanArray::from(pk_values)) as ArrayRef,
1452 ],
1453 )
1454 .map_err(|e| Error::Internal(format!("failed to create pragma_table_info batch: {}", e)))?;
1455
1456 let projection_indices: Vec<usize> = select
1458 .projection
1459 .iter()
1460 .filter_map(|item| {
1461 match item {
1462 SelectItem::UnnamedExpr(SqlExpr::Identifier(ident)) => {
1463 schema.index_of(&ident.value).ok()
1464 }
1465 SelectItem::ExprWithAlias { expr, .. } => {
1466 if let SqlExpr::Identifier(ident) = expr {
1467 schema.index_of(&ident.value).ok()
1468 } else {
1469 None
1470 }
1471 }
1472 SelectItem::Wildcard(_) => None, _ => None,
1474 }
1475 })
1476 .collect();
1477
1478 let projected_schema;
1480 if !projection_indices.is_empty() {
1481 let projected_fields: Vec<Field> = projection_indices
1482 .iter()
1483 .map(|&idx| schema.field(idx).clone())
1484 .collect();
1485 projected_schema = Arc::new(Schema::new(projected_fields));
1486
1487 let projected_columns: Vec<ArrayRef> = projection_indices
1488 .iter()
1489 .map(|&idx| Arc::clone(batch.column(idx)))
1490 .collect();
1491
1492 batch = RecordBatch::try_new(Arc::clone(&projected_schema), projected_columns)
1493 .map_err(|e| Error::Internal(format!("failed to project columns: {}", e)))?;
1494 } else {
1495 projected_schema = schema;
1497 }
1498
1499 if let Some(order_by) = &query.order_by {
1501 use arrow::compute::SortColumn;
1502 use arrow::compute::lexsort_to_indices;
1503 use sqlparser::ast::OrderByKind;
1504
1505 let exprs = match &order_by.kind {
1506 OrderByKind::Expressions(exprs) => exprs,
1507 _ => {
1508 return Err(Error::InvalidArgumentError(
1509 "unsupported ORDER BY clause".into(),
1510 ));
1511 }
1512 };
1513
1514 let mut sort_columns = Vec::new();
1515 for order_expr in exprs {
1516 if let SqlExpr::Identifier(ident) = &order_expr.expr
1517 && let Ok(col_idx) = projected_schema.index_of(&ident.value)
1518 {
1519 let options = arrow::compute::SortOptions {
1520 descending: !order_expr.options.asc.unwrap_or(true),
1521 nulls_first: order_expr.options.nulls_first.unwrap_or(false),
1522 };
1523 sort_columns.push(SortColumn {
1524 values: Arc::clone(batch.column(col_idx)),
1525 options: Some(options),
1526 });
1527 }
1528 }
1529
1530 if !sort_columns.is_empty() {
1531 let indices = lexsort_to_indices(&sort_columns, None)
1532 .map_err(|e| Error::Internal(format!("failed to sort: {}", e)))?;
1533
1534 use arrow::compute::take;
1535 let sorted_columns: Result<Vec<ArrayRef>, _> = batch
1536 .columns()
1537 .iter()
1538 .map(|col| take(col.as_ref(), &indices, None))
1539 .collect();
1540
1541 batch = RecordBatch::try_new(
1542 Arc::clone(&projected_schema),
1543 sorted_columns
1544 .map_err(|e| Error::Internal(format!("failed to apply sort: {}", e)))?,
1545 )
1546 .map_err(|e| Error::Internal(format!("failed to create sorted batch: {}", e)))?;
1547 }
1548 }
1549
1550 let execution = SelectExecution::new_single_batch(
1551 table_name.clone(),
1552 Arc::clone(&projected_schema),
1553 batch,
1554 );
1555
1556 Ok(Some(RuntimeStatementResult::Select {
1557 table_name,
1558 schema: projected_schema,
1559 execution,
1560 }))
1561 }
1562
1563 fn handle_create_table_as(
1564 &self,
1565 display_name: String,
1566 _canonical_name: String,
1567 query: Query,
1568 if_not_exists: bool,
1569 or_replace: bool,
1570 namespace: Option<String>,
1571 ) -> SqlResult<RuntimeStatementResult<P>> {
1572 let select_plan = self.build_select_plan(query)?;
1573
1574 if select_plan.projections.is_empty() && select_plan.aggregates.is_empty() {
1575 return Err(Error::InvalidArgumentError(
1576 "CREATE TABLE AS SELECT requires at least one projected column".into(),
1577 ));
1578 }
1579
1580 let plan = CreateTablePlan {
1581 name: display_name,
1582 if_not_exists,
1583 or_replace,
1584 columns: Vec::new(),
1585 source: Some(CreateTableSource::Select {
1586 plan: Box::new(select_plan),
1587 }),
1588 namespace,
1589 foreign_keys: Vec::new(),
1590 };
1591 self.execute_plan_statement(PlanStatement::CreateTable(plan))
1592 }
1593
1594 fn handle_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<RuntimeStatementResult<P>> {
1595 let table_name_debug =
1596 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
1597 tracing::trace!(
1598 "DEBUG SQL handle_insert called for table={}",
1599 table_name_debug
1600 );
1601 if !self.engine.session().has_active_transaction()
1602 && self.is_table_marked_dropped(&table_name_debug)?
1603 {
1604 return Err(Error::TransactionContextError(
1605 DROPPED_TABLE_TRANSACTION_ERR.into(),
1606 ));
1607 }
1608 if stmt.replace_into || stmt.ignore || stmt.or.is_some() {
1609 return Err(Error::InvalidArgumentError(
1610 "non-standard INSERT forms are not supported".into(),
1611 ));
1612 }
1613 if stmt.overwrite {
1614 return Err(Error::InvalidArgumentError(
1615 "INSERT OVERWRITE is not supported".into(),
1616 ));
1617 }
1618 if !stmt.assignments.is_empty() {
1619 return Err(Error::InvalidArgumentError(
1620 "INSERT ... SET is not supported".into(),
1621 ));
1622 }
1623 if stmt.partitioned.is_some() || !stmt.after_columns.is_empty() {
1624 return Err(Error::InvalidArgumentError(
1625 "partitioned INSERT is not supported".into(),
1626 ));
1627 }
1628 if stmt.returning.is_some() {
1629 return Err(Error::InvalidArgumentError(
1630 "INSERT ... RETURNING is not supported".into(),
1631 ));
1632 }
1633 if stmt.format_clause.is_some() || stmt.settings.is_some() {
1634 return Err(Error::InvalidArgumentError(
1635 "INSERT with FORMAT or SETTINGS is not supported".into(),
1636 ));
1637 }
1638
1639 let (display_name, _canonical_name) = match &stmt.table {
1640 TableObject::TableName(name) => canonical_object_name(name)?,
1641 _ => {
1642 return Err(Error::InvalidArgumentError(
1643 "INSERT requires a plain table name".into(),
1644 ));
1645 }
1646 };
1647
1648 let columns: Vec<String> = stmt
1649 .columns
1650 .iter()
1651 .map(|ident| ident.value.clone())
1652 .collect();
1653 let source_expr = stmt
1654 .source
1655 .as_ref()
1656 .ok_or_else(|| Error::InvalidArgumentError("INSERT requires a VALUES clause".into()))?;
1657 validate_simple_query(source_expr)?;
1658
1659 let insert_source = match source_expr.body.as_ref() {
1660 SetExpr::Values(values) => {
1661 if values.rows.is_empty() {
1662 return Err(Error::InvalidArgumentError(
1663 "INSERT VALUES list must contain at least one row".into(),
1664 ));
1665 }
1666 let mut rows: Vec<Vec<SqlValue>> = Vec::with_capacity(values.rows.len());
1667 for row in &values.rows {
1668 let mut converted = Vec::with_capacity(row.len());
1669 for expr in row {
1670 converted.push(SqlValue::try_from_expr(expr)?);
1671 }
1672 rows.push(converted);
1673 }
1674 InsertSource::Rows(
1675 rows.into_iter()
1676 .map(|row| row.into_iter().map(PlanValue::from).collect())
1677 .collect(),
1678 )
1679 }
1680 SetExpr::Select(select) => {
1681 if let Some(rows) = extract_constant_select_rows(select.as_ref())? {
1682 InsertSource::Rows(rows)
1683 } else if let Some(range_rows) = extract_rows_from_range(select.as_ref())? {
1684 InsertSource::Rows(range_rows.into_rows())
1685 } else {
1686 let select_plan = self.build_select_plan((**source_expr).clone())?;
1687 InsertSource::Select {
1688 plan: Box::new(select_plan),
1689 }
1690 }
1691 }
1692 _ => {
1693 return Err(Error::InvalidArgumentError(
1694 "unsupported INSERT source".into(),
1695 ));
1696 }
1697 };
1698
1699 let plan = InsertPlan {
1700 table: display_name.clone(),
1701 columns,
1702 source: insert_source,
1703 };
1704 tracing::trace!(
1705 "DEBUG SQL handle_insert: about to execute insert for table={}",
1706 display_name
1707 );
1708 self.execute_plan_statement(PlanStatement::Insert(plan))
1709 }
1710
1711 fn handle_update(
1712 &self,
1713 table: TableWithJoins,
1714 assignments: Vec<Assignment>,
1715 from: Option<UpdateTableFromKind>,
1716 selection: Option<SqlExpr>,
1717 returning: Option<Vec<SelectItem>>,
1718 ) -> SqlResult<RuntimeStatementResult<P>> {
1719 if from.is_some() {
1720 return Err(Error::InvalidArgumentError(
1721 "UPDATE ... FROM is not supported yet".into(),
1722 ));
1723 }
1724 if returning.is_some() {
1725 return Err(Error::InvalidArgumentError(
1726 "UPDATE ... RETURNING is not supported".into(),
1727 ));
1728 }
1729 if assignments.is_empty() {
1730 return Err(Error::InvalidArgumentError(
1731 "UPDATE requires at least one assignment".into(),
1732 ));
1733 }
1734
1735 let (display_name, canonical_name) = extract_single_table(std::slice::from_ref(&table))?;
1736
1737 if !self.engine.session().has_active_transaction()
1738 && self
1739 .engine
1740 .context()
1741 .is_table_marked_dropped(&canonical_name)
1742 {
1743 return Err(Error::TransactionContextError(
1744 DROPPED_TABLE_TRANSACTION_ERR.into(),
1745 ));
1746 }
1747
1748 let catalog = self.engine.context().table_catalog();
1749 let resolver = catalog.identifier_resolver();
1750 let table_id = catalog.table_id(&canonical_name);
1751
1752 let mut column_assignments = Vec::with_capacity(assignments.len());
1753 let mut seen: HashMap<String, ()> = HashMap::new();
1754 for assignment in assignments {
1755 let column_name = resolve_assignment_column_name(&assignment.target)?;
1756 let normalized = column_name.to_ascii_lowercase();
1757 if seen.insert(normalized, ()).is_some() {
1758 return Err(Error::InvalidArgumentError(format!(
1759 "duplicate column '{}' in UPDATE assignments",
1760 column_name
1761 )));
1762 }
1763 let value = match SqlValue::try_from_expr(&assignment.value) {
1764 Ok(literal) => AssignmentValue::Literal(PlanValue::from(literal)),
1765 Err(Error::InvalidArgumentError(msg))
1766 if msg.contains("unsupported literal expression") =>
1767 {
1768 let translated = translate_scalar_with_context(
1769 &resolver,
1770 IdentifierContext::new(table_id),
1771 &assignment.value,
1772 )?;
1773 AssignmentValue::Expression(translated)
1774 }
1775 Err(err) => return Err(err),
1776 };
1777 column_assignments.push(ColumnAssignment {
1778 column: column_name,
1779 value,
1780 });
1781 }
1782
1783 let filter = match selection {
1784 Some(expr) => Some(translate_condition_with_context(
1785 &resolver,
1786 IdentifierContext::new(table_id),
1787 &expr,
1788 )?),
1789 None => None,
1790 };
1791
1792 let plan = UpdatePlan {
1793 table: display_name.clone(),
1794 assignments: column_assignments,
1795 filter,
1796 };
1797 self.execute_plan_statement(PlanStatement::Update(plan))
1798 }
1799
1800 #[allow(clippy::collapsible_if)]
1801 fn handle_delete(&self, delete: Delete) -> SqlResult<RuntimeStatementResult<P>> {
1802 let Delete {
1803 tables,
1804 from,
1805 using,
1806 selection,
1807 returning,
1808 order_by,
1809 limit,
1810 } = delete;
1811
1812 if !tables.is_empty() {
1813 return Err(Error::InvalidArgumentError(
1814 "multi-table DELETE is not supported yet".into(),
1815 ));
1816 }
1817 if let Some(using_tables) = using {
1818 if !using_tables.is_empty() {
1819 return Err(Error::InvalidArgumentError(
1820 "DELETE ... USING is not supported yet".into(),
1821 ));
1822 }
1823 }
1824 if returning.is_some() {
1825 return Err(Error::InvalidArgumentError(
1826 "DELETE ... RETURNING is not supported".into(),
1827 ));
1828 }
1829 if !order_by.is_empty() {
1830 return Err(Error::InvalidArgumentError(
1831 "DELETE ... ORDER BY is not supported yet".into(),
1832 ));
1833 }
1834 if limit.is_some() {
1835 return Err(Error::InvalidArgumentError(
1836 "DELETE ... LIMIT is not supported yet".into(),
1837 ));
1838 }
1839
1840 let from_tables = match from {
1841 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
1842 };
1843 let (display_name, canonical_name) = extract_single_table(&from_tables)?;
1844
1845 if !self.engine.session().has_active_transaction()
1846 && self
1847 .engine
1848 .context()
1849 .is_table_marked_dropped(&canonical_name)
1850 {
1851 return Err(Error::TransactionContextError(
1852 DROPPED_TABLE_TRANSACTION_ERR.into(),
1853 ));
1854 }
1855
1856 let catalog = self.engine.context().table_catalog();
1857 let resolver = catalog.identifier_resolver();
1858 let table_id = catalog.table_id(&canonical_name);
1859
1860 let filter = selection
1861 .map(|expr| {
1862 translate_condition_with_context(&resolver, IdentifierContext::new(table_id), &expr)
1863 })
1864 .transpose()?;
1865
1866 let plan = DeletePlan {
1867 table: display_name.clone(),
1868 filter,
1869 };
1870 self.execute_plan_statement(PlanStatement::Delete(plan))
1871 }
1872
1873 #[allow(clippy::too_many_arguments)] fn handle_drop(
1875 &self,
1876 object_type: ObjectType,
1877 if_exists: bool,
1878 names: Vec<ObjectName>,
1879 cascade: bool,
1880 restrict: bool,
1881 purge: bool,
1882 temporary: bool,
1883 ) -> SqlResult<RuntimeStatementResult<P>> {
1884 if purge || temporary {
1885 return Err(Error::InvalidArgumentError(
1886 "DROP purge/temporary options are not supported".into(),
1887 ));
1888 }
1889
1890 match object_type {
1891 ObjectType::Table => {
1892 if cascade || restrict {
1893 return Err(Error::InvalidArgumentError(
1894 "DROP TABLE CASCADE/RESTRICT is not supported".into(),
1895 ));
1896 }
1897
1898 let session = self.engine.session();
1899 for name in names {
1900 let table_name = Self::object_name_to_string(&name)?;
1901 session
1902 .drop_table(&table_name, if_exists)
1903 .map_err(|err| Self::map_table_error(&table_name, err))?;
1904 }
1905
1906 Ok(RuntimeStatementResult::NoOp)
1907 }
1908 ObjectType::Schema => {
1909 if restrict {
1910 return Err(Error::InvalidArgumentError(
1911 "DROP SCHEMA RESTRICT is not supported".into(),
1912 ));
1913 }
1914
1915 let catalog = self.engine.context().table_catalog();
1916
1917 for name in names {
1918 let (display_name, canonical_name) = canonical_object_name(&name)?;
1919
1920 if !catalog.schema_exists(&canonical_name) {
1921 if if_exists {
1922 continue;
1923 }
1924 return Err(Error::CatalogError(format!(
1925 "Schema '{}' does not exist",
1926 display_name
1927 )));
1928 }
1929
1930 if cascade {
1931 let all_tables = catalog.table_names();
1933 let schema_prefix = format!("{}.", canonical_name);
1934
1935 let ctx = self.engine.context();
1936 for table in all_tables {
1937 if table.to_ascii_lowercase().starts_with(&schema_prefix) {
1938 ctx.drop_table_immediate(&table, false)?;
1939 }
1940 }
1941 } else {
1942 let all_tables = catalog.table_names();
1944 let schema_prefix = format!("{}.", canonical_name);
1945 let has_tables = all_tables
1946 .iter()
1947 .any(|t| t.to_ascii_lowercase().starts_with(&schema_prefix));
1948
1949 if has_tables {
1950 return Err(Error::CatalogError(format!(
1951 "Schema '{}' is not empty. Use CASCADE to drop schema and all its tables",
1952 display_name
1953 )));
1954 }
1955 }
1956
1957 if !catalog.unregister_schema(&canonical_name) && !if_exists {
1959 return Err(Error::CatalogError(format!(
1960 "Schema '{}' does not exist",
1961 display_name
1962 )));
1963 }
1964 }
1965
1966 Ok(RuntimeStatementResult::NoOp)
1967 }
1968 _ => Err(Error::InvalidArgumentError(format!(
1969 "DROP {} is not supported",
1970 object_type
1971 ))),
1972 }
1973 }
1974
1975 fn handle_query(&self, query: Query) -> SqlResult<RuntimeStatementResult<P>> {
1976 if let Some(result) = self.try_handle_pragma_table_info(&query)? {
1978 return Ok(result);
1979 }
1980
1981 let select_plan = self.build_select_plan(query)?;
1982 self.execute_plan_statement(PlanStatement::Select(select_plan))
1983 }
1984
1985 fn build_select_plan(&self, query: Query) -> SqlResult<SelectPlan> {
1986 if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
1987 return Err(Error::TransactionContextError(
1988 "TransactionContext Error: transaction is aborted".into(),
1989 ));
1990 }
1991
1992 validate_simple_query(&query)?;
1993 let catalog = self.engine.context().table_catalog();
1994 let resolver = catalog.identifier_resolver();
1995
1996 let (mut select_plan, select_context) = match query.body.as_ref() {
1997 SetExpr::Select(select) => self.translate_select(select.as_ref(), &resolver)?,
1998 other => {
1999 return Err(Error::InvalidArgumentError(format!(
2000 "unsupported query expression: {other:?}"
2001 )));
2002 }
2003 };
2004 if let Some(order_by) = &query.order_by {
2005 if !select_plan.aggregates.is_empty() {
2006 return Err(Error::InvalidArgumentError(
2007 "ORDER BY is not supported for aggregate queries".into(),
2008 ));
2009 }
2010 let order_plan = self.translate_order_by(&resolver, select_context, order_by)?;
2011 select_plan = select_plan.with_order_by(order_plan);
2012 }
2013 Ok(select_plan)
2014 }
2015
2016 fn translate_select(
2017 &self,
2018 select: &Select,
2019 resolver: &IdentifierResolver<'_>,
2020 ) -> SqlResult<(SelectPlan, IdentifierContext)> {
2021 if select.distinct.is_some() {
2022 return Err(Error::InvalidArgumentError(
2023 "SELECT DISTINCT is not supported".into(),
2024 ));
2025 }
2026 if select.top.is_some() {
2027 return Err(Error::InvalidArgumentError(
2028 "SELECT TOP is not supported".into(),
2029 ));
2030 }
2031 if select.exclude.is_some() {
2032 return Err(Error::InvalidArgumentError(
2033 "SELECT EXCLUDE is not supported".into(),
2034 ));
2035 }
2036 if select.into.is_some() {
2037 return Err(Error::InvalidArgumentError(
2038 "SELECT INTO is not supported".into(),
2039 ));
2040 }
2041 if !select.lateral_views.is_empty() {
2042 return Err(Error::InvalidArgumentError(
2043 "LATERAL VIEW is not supported".into(),
2044 ));
2045 }
2046 if select.prewhere.is_some() {
2047 return Err(Error::InvalidArgumentError(
2048 "PREWHERE is not supported".into(),
2049 ));
2050 }
2051 if !group_by_is_empty(&select.group_by) || select.value_table_mode.is_some() {
2052 return Err(Error::InvalidArgumentError(
2053 "GROUP BY and SELECT AS VALUE/STRUCT are not supported".into(),
2054 ));
2055 }
2056 if !select.cluster_by.is_empty()
2057 || !select.distribute_by.is_empty()
2058 || !select.sort_by.is_empty()
2059 {
2060 return Err(Error::InvalidArgumentError(
2061 "CLUSTER/DISTRIBUTE/SORT BY clauses are not supported".into(),
2062 ));
2063 }
2064 if select.having.is_some()
2065 || !select.named_window.is_empty()
2066 || select.qualify.is_some()
2067 || select.connect_by.is_some()
2068 {
2069 return Err(Error::InvalidArgumentError(
2070 "advanced SELECT clauses are not supported".into(),
2071 ));
2072 }
2073
2074 let table_alias = select
2075 .from
2076 .first()
2077 .and_then(|table_with_joins| match &table_with_joins.relation {
2078 TableFactor::Table { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2079 _ => None,
2080 });
2081
2082 if let Some(alias) = table_alias.as_ref() {
2083 validate_projection_alias_qualifiers(&select.projection, alias)?;
2084 }
2085 let catalog = self.engine.context().table_catalog();
2087 let (mut plan, id_context) = if select.from.is_empty() {
2088 let mut p = SelectPlan::new("");
2090 let projections = self.build_projection_list(
2091 resolver,
2092 IdentifierContext::new(None),
2093 &select.projection,
2094 )?;
2095 p = p.with_projections(projections);
2096 (p, IdentifierContext::new(None))
2097 } else if select.from.len() == 1 {
2098 let (display_name, canonical_name) = extract_single_table(&select.from)?;
2100 let table_id = catalog.table_id(&canonical_name);
2101 let mut p = SelectPlan::new(display_name.clone());
2102 if let Some(aggregates) = self.detect_simple_aggregates(&select.projection)? {
2103 p = p.with_aggregates(aggregates);
2104 } else {
2105 let projections = self.build_projection_list(
2106 resolver,
2107 IdentifierContext::new(table_id),
2108 &select.projection,
2109 )?;
2110 p = p.with_projections(projections);
2111 }
2112 (p, IdentifierContext::new(table_id))
2113 } else {
2114 let tables = extract_tables(&select.from)?;
2116 let mut p = SelectPlan::with_tables(tables);
2117 let projections = self.build_projection_list(
2120 resolver,
2121 IdentifierContext::new(None),
2122 &select.projection,
2123 )?;
2124 p = p.with_projections(projections);
2125 (p, IdentifierContext::new(None))
2126 };
2127
2128 let filter_expr = match &select.selection {
2129 Some(expr) => Some(translate_condition_with_context(
2130 resolver, id_context, expr,
2131 )?),
2132 None => None,
2133 };
2134 plan = plan.with_filter(filter_expr);
2135 Ok((plan, id_context))
2136 }
2137
2138 fn translate_order_by(
2139 &self,
2140 resolver: &IdentifierResolver<'_>,
2141 id_context: IdentifierContext,
2142 order_by: &OrderBy,
2143 ) -> SqlResult<Vec<OrderByPlan>> {
2144 let exprs = match &order_by.kind {
2145 OrderByKind::Expressions(exprs) => exprs,
2146 _ => {
2147 return Err(Error::InvalidArgumentError(
2148 "unsupported ORDER BY clause".into(),
2149 ));
2150 }
2151 };
2152
2153 let base_nulls_first = self.default_nulls_first.load(AtomicOrdering::Relaxed);
2154
2155 let resolve_simple_column = |expr: &SqlExpr| -> SqlResult<String> {
2156 let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2157 match scalar {
2158 llkv_expr::expr::ScalarExpr::Column(column) => Ok(column),
2159 other => Err(Error::InvalidArgumentError(format!(
2160 "ORDER BY expression must reference a simple column, found {other:?}"
2161 ))),
2162 }
2163 };
2164
2165 let mut plans = Vec::with_capacity(exprs.len());
2166 for order_expr in exprs {
2167 let ascending = order_expr.options.asc.unwrap_or(true);
2168 let default_nulls_first_for_direction = if ascending {
2169 base_nulls_first
2170 } else {
2171 !base_nulls_first
2172 };
2173 let nulls_first = order_expr
2174 .options
2175 .nulls_first
2176 .unwrap_or(default_nulls_first_for_direction);
2177
2178 if let SqlExpr::Identifier(ident) = &order_expr.expr
2179 && ident.value.eq_ignore_ascii_case("ALL")
2180 && ident.quote_style.is_none()
2181 {
2182 plans.push(OrderByPlan {
2183 target: OrderTarget::All,
2184 sort_type: OrderSortType::Native,
2185 ascending,
2186 nulls_first,
2187 });
2188 continue;
2189 }
2190
2191 let (target, sort_type) = match &order_expr.expr {
2192 SqlExpr::Identifier(_) | SqlExpr::CompoundIdentifier(_) => (
2193 OrderTarget::Column(resolve_simple_column(&order_expr.expr)?),
2194 OrderSortType::Native,
2195 ),
2196 SqlExpr::Cast {
2197 expr,
2198 data_type:
2199 SqlDataType::Int(_)
2200 | SqlDataType::Integer(_)
2201 | SqlDataType::BigInt(_)
2202 | SqlDataType::SmallInt(_)
2203 | SqlDataType::TinyInt(_),
2204 ..
2205 } => (
2206 OrderTarget::Column(resolve_simple_column(expr)?),
2207 OrderSortType::CastTextToInteger,
2208 ),
2209 SqlExpr::Cast { data_type, .. } => {
2210 return Err(Error::InvalidArgumentError(format!(
2211 "ORDER BY CAST target type {:?} is not supported",
2212 data_type
2213 )));
2214 }
2215 SqlExpr::Value(value_with_span) => match &value_with_span.value {
2216 Value::Number(raw, _) => {
2217 let position: usize = raw.parse().map_err(|_| {
2218 Error::InvalidArgumentError(format!(
2219 "ORDER BY position '{}' is not a valid positive integer",
2220 raw
2221 ))
2222 })?;
2223 if position == 0 {
2224 return Err(Error::InvalidArgumentError(
2225 "ORDER BY position must be at least 1".into(),
2226 ));
2227 }
2228 (OrderTarget::Index(position - 1), OrderSortType::Native)
2229 }
2230 other => {
2231 return Err(Error::InvalidArgumentError(format!(
2232 "unsupported ORDER BY literal expression: {other:?}"
2233 )));
2234 }
2235 },
2236 other => {
2237 return Err(Error::InvalidArgumentError(format!(
2238 "unsupported ORDER BY expression: {other:?}"
2239 )));
2240 }
2241 };
2242
2243 plans.push(OrderByPlan {
2244 target,
2245 sort_type,
2246 ascending,
2247 nulls_first,
2248 });
2249 }
2250
2251 Ok(plans)
2252 }
2253
2254 fn detect_simple_aggregates(
2255 &self,
2256 projection_items: &[SelectItem],
2257 ) -> SqlResult<Option<Vec<AggregateExpr>>> {
2258 if projection_items.is_empty() {
2259 return Ok(None);
2260 }
2261
2262 let mut specs: Vec<AggregateExpr> = Vec::with_capacity(projection_items.len());
2263 for (idx, item) in projection_items.iter().enumerate() {
2264 let (expr, alias_opt) = match item {
2265 SelectItem::UnnamedExpr(expr) => (expr, None),
2266 SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())),
2267 _ => return Ok(None),
2268 };
2269
2270 let alias = alias_opt.unwrap_or_else(|| format!("col{}", idx + 1));
2271 let SqlExpr::Function(func) = expr else {
2272 return Ok(None);
2273 };
2274
2275 if func.uses_odbc_syntax {
2276 return Err(Error::InvalidArgumentError(
2277 "ODBC function syntax is not supported in aggregate queries".into(),
2278 ));
2279 }
2280 if !matches!(func.parameters, FunctionArguments::None) {
2281 return Err(Error::InvalidArgumentError(
2282 "parameterized aggregate functions are not supported".into(),
2283 ));
2284 }
2285 if func.filter.is_some()
2286 || func.null_treatment.is_some()
2287 || func.over.is_some()
2288 || !func.within_group.is_empty()
2289 {
2290 return Err(Error::InvalidArgumentError(
2291 "advanced aggregate clauses are not supported".into(),
2292 ));
2293 }
2294
2295 let mut is_distinct = false;
2296 let args_slice: &[FunctionArg] = match &func.args {
2297 FunctionArguments::List(list) => {
2298 if let Some(dup) = &list.duplicate_treatment {
2299 use sqlparser::ast::DuplicateTreatment;
2300 match dup {
2301 DuplicateTreatment::All => {}
2302 DuplicateTreatment::Distinct => is_distinct = true,
2303 }
2304 }
2305 if !list.clauses.is_empty() {
2306 return Err(Error::InvalidArgumentError(
2307 "aggregate argument clauses are not supported".into(),
2308 ));
2309 }
2310 &list.args
2311 }
2312 FunctionArguments::None => &[],
2313 FunctionArguments::Subquery(_) => {
2314 return Err(Error::InvalidArgumentError(
2315 "aggregate subquery arguments are not supported".into(),
2316 ));
2317 }
2318 };
2319
2320 let func_name = if func.name.0.len() == 1 {
2321 match &func.name.0[0] {
2322 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
2323 _ => {
2324 return Err(Error::InvalidArgumentError(
2325 "unsupported aggregate function name".into(),
2326 ));
2327 }
2328 }
2329 } else {
2330 return Err(Error::InvalidArgumentError(
2331 "qualified aggregate function names are not supported".into(),
2332 ));
2333 };
2334
2335 let aggregate = match func_name.as_str() {
2336 "count" => {
2337 if args_slice.len() != 1 {
2338 return Err(Error::InvalidArgumentError(
2339 "COUNT accepts exactly one argument".into(),
2340 ));
2341 }
2342 match &args_slice[0] {
2343 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
2344 if is_distinct {
2345 return Err(Error::InvalidArgumentError(
2346 "COUNT(DISTINCT *) is not supported".into(),
2347 ));
2348 }
2349 AggregateExpr::count_star(alias)
2350 }
2351 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
2352 let column = resolve_column_name(arg_expr)?;
2353 if is_distinct {
2354 AggregateExpr::count_distinct_column(column, alias)
2355 } else {
2356 AggregateExpr::count_column(column, alias)
2357 }
2358 }
2359 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
2360 return Err(Error::InvalidArgumentError(
2361 "named COUNT arguments are not supported".into(),
2362 ));
2363 }
2364 FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
2365 return Err(Error::InvalidArgumentError(
2366 "COUNT does not support qualified wildcards".into(),
2367 ));
2368 }
2369 }
2370 }
2371 "sum" | "min" | "max" => {
2372 if is_distinct {
2373 return Err(Error::InvalidArgumentError(
2374 "DISTINCT is not supported for this aggregate".into(),
2375 ));
2376 }
2377 if args_slice.len() != 1 {
2378 return Err(Error::InvalidArgumentError(format!(
2379 "{} accepts exactly one argument",
2380 func_name.to_uppercase()
2381 )));
2382 }
2383 let arg_expr = match &args_slice[0] {
2384 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => arg_expr,
2385 FunctionArg::Unnamed(FunctionArgExpr::Wildcard)
2386 | FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
2387 return Err(Error::InvalidArgumentError(format!(
2388 "{} does not support wildcard arguments",
2389 func_name.to_uppercase()
2390 )));
2391 }
2392 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
2393 return Err(Error::InvalidArgumentError(format!(
2394 "{} arguments must be column references",
2395 func_name.to_uppercase()
2396 )));
2397 }
2398 };
2399
2400 if func_name == "sum" {
2401 if let Some(column) = parse_count_nulls_case(arg_expr)? {
2402 AggregateExpr::count_nulls(column, alias)
2403 } else {
2404 let column = resolve_column_name(arg_expr)?;
2405 AggregateExpr::sum_int64(column, alias)
2406 }
2407 } else {
2408 let column = resolve_column_name(arg_expr)?;
2409 if func_name == "min" {
2410 AggregateExpr::min_int64(column, alias)
2411 } else {
2412 AggregateExpr::max_int64(column, alias)
2413 }
2414 }
2415 }
2416 _ => return Ok(None),
2417 };
2418
2419 specs.push(aggregate);
2420 }
2421
2422 if specs.is_empty() {
2423 return Ok(None);
2424 }
2425 Ok(Some(specs))
2426 }
2427
2428 fn build_projection_list(
2429 &self,
2430 resolver: &IdentifierResolver<'_>,
2431 id_context: IdentifierContext,
2432 projection_items: &[SelectItem],
2433 ) -> SqlResult<Vec<SelectProjection>> {
2434 if projection_items.is_empty() {
2435 return Err(Error::InvalidArgumentError(
2436 "SELECT projection must include at least one column".into(),
2437 ));
2438 }
2439
2440 let mut projections = Vec::with_capacity(projection_items.len());
2441 for (idx, item) in projection_items.iter().enumerate() {
2442 match item {
2443 SelectItem::Wildcard(options) => {
2444 if let Some(exclude) = &options.opt_exclude {
2445 use sqlparser::ast::ExcludeSelectItem;
2446 let exclude_cols = match exclude {
2447 ExcludeSelectItem::Single(ident) => vec![ident.value.clone()],
2448 ExcludeSelectItem::Multiple(idents) => {
2449 idents.iter().map(|id| id.value.clone()).collect()
2450 }
2451 };
2452 projections.push(SelectProjection::AllColumnsExcept {
2453 exclude: exclude_cols,
2454 });
2455 } else {
2456 projections.push(SelectProjection::AllColumns);
2457 }
2458 }
2459 SelectItem::QualifiedWildcard(kind, _) => match kind {
2460 SelectItemQualifiedWildcardKind::ObjectName(name) => {
2461 projections.push(SelectProjection::Column {
2462 name: name.to_string(),
2463 alias: None,
2464 });
2465 }
2466 SelectItemQualifiedWildcardKind::Expr(_) => {
2467 return Err(Error::InvalidArgumentError(
2468 "expression-qualified wildcards are not supported".into(),
2469 ));
2470 }
2471 },
2472 SelectItem::UnnamedExpr(expr) => match expr {
2473 SqlExpr::Identifier(ident) => {
2474 let parts = vec![ident.value.clone()];
2475 let resolution = resolver.resolve(&parts, id_context)?;
2476 if resolution.is_simple() {
2477 projections.push(SelectProjection::Column {
2478 name: resolution.column().to_string(),
2479 alias: None,
2480 });
2481 } else {
2482 let alias = format!("col{}", idx + 1);
2483 projections.push(SelectProjection::Computed {
2484 expr: resolution.into_scalar_expr(),
2485 alias,
2486 });
2487 }
2488 }
2489 SqlExpr::CompoundIdentifier(parts) => {
2490 let name_parts: Vec<String> =
2491 parts.iter().map(|part| part.value.clone()).collect();
2492 let resolution = resolver.resolve(&name_parts, id_context)?;
2493 if resolution.is_simple() {
2494 projections.push(SelectProjection::Column {
2495 name: resolution.column().to_string(),
2496 alias: None,
2497 });
2498 } else {
2499 let alias = format!("col{}", idx + 1);
2500 projections.push(SelectProjection::Computed {
2501 expr: resolution.into_scalar_expr(),
2502 alias,
2503 });
2504 }
2505 }
2506 _ => {
2507 let alias = format!("col{}", idx + 1);
2508 let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2509 projections.push(SelectProjection::Computed {
2510 expr: scalar,
2511 alias,
2512 });
2513 }
2514 },
2515 SelectItem::ExprWithAlias { expr, alias } => match expr {
2516 SqlExpr::Identifier(ident) => {
2517 let parts = vec![ident.value.clone()];
2518 let resolution = resolver.resolve(&parts, id_context)?;
2519 if resolution.is_simple() {
2520 projections.push(SelectProjection::Column {
2521 name: resolution.column().to_string(),
2522 alias: Some(alias.value.clone()),
2523 });
2524 } else {
2525 projections.push(SelectProjection::Computed {
2526 expr: resolution.into_scalar_expr(),
2527 alias: alias.value.clone(),
2528 });
2529 }
2530 }
2531 SqlExpr::CompoundIdentifier(parts) => {
2532 let name_parts: Vec<String> =
2533 parts.iter().map(|part| part.value.clone()).collect();
2534 let resolution = resolver.resolve(&name_parts, id_context)?;
2535 if resolution.is_simple() {
2536 projections.push(SelectProjection::Column {
2537 name: resolution.column().to_string(),
2538 alias: Some(alias.value.clone()),
2539 });
2540 } else {
2541 projections.push(SelectProjection::Computed {
2542 expr: resolution.into_scalar_expr(),
2543 alias: alias.value.clone(),
2544 });
2545 }
2546 }
2547 _ => {
2548 let scalar = translate_scalar_with_context(resolver, id_context, expr)?;
2549 projections.push(SelectProjection::Computed {
2550 expr: scalar,
2551 alias: alias.value.clone(),
2552 });
2553 }
2554 },
2555 }
2556 }
2557 Ok(projections)
2558 }
2559
2560 #[allow(clippy::too_many_arguments)] fn handle_start_transaction(
2562 &self,
2563 modes: Vec<TransactionMode>,
2564 begin: bool,
2565 transaction: Option<BeginTransactionKind>,
2566 modifier: Option<TransactionModifier>,
2567 statements: Vec<Statement>,
2568 exception: Option<Vec<ExceptionWhen>>,
2569 has_end_keyword: bool,
2570 ) -> SqlResult<RuntimeStatementResult<P>> {
2571 if !modes.is_empty() {
2572 return Err(Error::InvalidArgumentError(
2573 "transaction modes are not supported".into(),
2574 ));
2575 }
2576 if modifier.is_some() {
2577 return Err(Error::InvalidArgumentError(
2578 "transaction modifiers are not supported".into(),
2579 ));
2580 }
2581 if !statements.is_empty() || exception.is_some() || has_end_keyword {
2582 return Err(Error::InvalidArgumentError(
2583 "BEGIN blocks with inline statements or exceptions are not supported".into(),
2584 ));
2585 }
2586 if let Some(kind) = transaction {
2587 match kind {
2588 BeginTransactionKind::Transaction | BeginTransactionKind::Work => {}
2589 }
2590 }
2591 if !begin {
2592 tracing::warn!("Currently treat `START TRANSACTION` same as `BEGIN`")
2594 }
2595
2596 self.execute_plan_statement(PlanStatement::BeginTransaction)
2597 }
2598
2599 fn handle_commit(
2600 &self,
2601 chain: bool,
2602 end: bool,
2603 modifier: Option<TransactionModifier>,
2604 ) -> SqlResult<RuntimeStatementResult<P>> {
2605 if chain {
2606 return Err(Error::InvalidArgumentError(
2607 "COMMIT AND [NO] CHAIN is not supported".into(),
2608 ));
2609 }
2610 if end {
2611 return Err(Error::InvalidArgumentError(
2612 "END blocks are not supported".into(),
2613 ));
2614 }
2615 if modifier.is_some() {
2616 return Err(Error::InvalidArgumentError(
2617 "transaction modifiers are not supported".into(),
2618 ));
2619 }
2620
2621 self.execute_plan_statement(PlanStatement::CommitTransaction)
2622 }
2623
2624 fn handle_rollback(
2625 &self,
2626 chain: bool,
2627 savepoint: Option<Ident>,
2628 ) -> SqlResult<RuntimeStatementResult<P>> {
2629 if chain {
2630 return Err(Error::InvalidArgumentError(
2631 "ROLLBACK AND [NO] CHAIN is not supported".into(),
2632 ));
2633 }
2634 if savepoint.is_some() {
2635 return Err(Error::InvalidArgumentError(
2636 "ROLLBACK TO SAVEPOINT is not supported".into(),
2637 ));
2638 }
2639
2640 self.execute_plan_statement(PlanStatement::RollbackTransaction)
2641 }
2642
2643 fn handle_set(&self, set_stmt: Set) -> SqlResult<RuntimeStatementResult<P>> {
2644 match set_stmt {
2645 Set::SingleAssignment {
2646 scope,
2647 hivevar,
2648 variable,
2649 values,
2650 } => {
2651 if scope.is_some() || hivevar {
2652 return Err(Error::InvalidArgumentError(
2653 "SET modifiers are not supported".into(),
2654 ));
2655 }
2656
2657 let variable_name_raw = variable.to_string();
2658 let variable_name = variable_name_raw.to_ascii_lowercase();
2659
2660 match variable_name.as_str() {
2661 "default_null_order" => {
2662 if values.len() != 1 {
2663 return Err(Error::InvalidArgumentError(
2664 "SET default_null_order expects exactly one value".into(),
2665 ));
2666 }
2667
2668 let value_expr = &values[0];
2669 let normalized = match value_expr {
2670 SqlExpr::Value(value_with_span) => value_with_span
2671 .value
2672 .clone()
2673 .into_string()
2674 .map(|s| s.to_ascii_lowercase()),
2675 SqlExpr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
2676 _ => None,
2677 };
2678
2679 if !matches!(normalized.as_deref(), Some("nulls_first" | "nulls_last")) {
2680 return Err(Error::InvalidArgumentError(format!(
2681 "unsupported value for SET default_null_order: {value_expr:?}"
2682 )));
2683 }
2684
2685 let use_nulls_first = matches!(normalized.as_deref(), Some("nulls_first"));
2686 self.default_nulls_first
2687 .store(use_nulls_first, AtomicOrdering::Relaxed);
2688
2689 Ok(RuntimeStatementResult::NoOp)
2690 }
2691 "immediate_transaction_mode" => {
2692 if values.len() != 1 {
2693 return Err(Error::InvalidArgumentError(
2694 "SET immediate_transaction_mode expects exactly one value".into(),
2695 ));
2696 }
2697 let normalized = values[0].to_string().to_ascii_lowercase();
2698 let enabled = match normalized.as_str() {
2699 "true" | "on" | "1" => true,
2700 "false" | "off" | "0" => false,
2701 _ => {
2702 return Err(Error::InvalidArgumentError(format!(
2703 "unsupported value for SET immediate_transaction_mode: {}",
2704 values[0]
2705 )));
2706 }
2707 };
2708 if !enabled {
2709 tracing::warn!(
2710 "SET immediate_transaction_mode=false has no effect; continuing with auto mode"
2711 );
2712 }
2713 Ok(RuntimeStatementResult::NoOp)
2714 }
2715 _ => Err(Error::InvalidArgumentError(format!(
2716 "unsupported SET variable: {variable_name_raw}"
2717 ))),
2718 }
2719 }
2720 other => Err(Error::InvalidArgumentError(format!(
2721 "unsupported SQL SET statement: {other:?}",
2722 ))),
2723 }
2724 }
2725
2726 fn handle_pragma(
2727 &self,
2728 name: ObjectName,
2729 value: Option<Value>,
2730 is_eq: bool,
2731 ) -> SqlResult<RuntimeStatementResult<P>> {
2732 let (display, canonical) = canonical_object_name(&name)?;
2733 if value.is_some() || is_eq {
2734 return Err(Error::InvalidArgumentError(format!(
2735 "PRAGMA '{display}' does not accept a value"
2736 )));
2737 }
2738
2739 match canonical.as_str() {
2740 "enable_verification" | "disable_verification" => Ok(RuntimeStatementResult::NoOp),
2741 _ => Err(Error::InvalidArgumentError(format!(
2742 "unsupported PRAGMA '{}'",
2743 display
2744 ))),
2745 }
2746 }
2747}
2748
2749fn canonical_object_name(name: &ObjectName) -> SqlResult<(String, String)> {
2750 if name.0.is_empty() {
2751 return Err(Error::InvalidArgumentError(
2752 "object name must not be empty".into(),
2753 ));
2754 }
2755 let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
2756 for part in &name.0 {
2757 let ident = match part {
2758 ObjectNamePart::Identifier(ident) => ident,
2759 _ => {
2760 return Err(Error::InvalidArgumentError(
2761 "object names using functions are not supported".into(),
2762 ));
2763 }
2764 };
2765 parts.push(ident.value.clone());
2766 }
2767 let display = parts.join(".");
2768 let canonical = display.to_ascii_lowercase();
2769 Ok((display, canonical))
2770}
2771
2772fn parse_schema_qualified_name(name: &ObjectName) -> SqlResult<(Option<String>, String)> {
2781 if name.0.is_empty() {
2782 return Err(Error::InvalidArgumentError(
2783 "object name must not be empty".into(),
2784 ));
2785 }
2786
2787 let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
2788 for part in &name.0 {
2789 let ident = match part {
2790 ObjectNamePart::Identifier(ident) => ident,
2791 _ => {
2792 return Err(Error::InvalidArgumentError(
2793 "object names using functions are not supported".into(),
2794 ));
2795 }
2796 };
2797 parts.push(ident.value.clone());
2798 }
2799
2800 match parts.len() {
2801 1 => Ok((None, parts[0].clone())),
2802 2 => Ok((Some(parts[0].clone()), parts[1].clone())),
2803 _ => Err(Error::InvalidArgumentError(format!(
2804 "table name has too many parts: {}",
2805 name
2806 ))),
2807 }
2808}
2809
2810fn extract_index_column_name(
2824 index_col: &sqlparser::ast::IndexColumn,
2825 context: &str,
2826 allow_sort_options: bool,
2827 allow_compound: bool,
2828) -> SqlResult<String> {
2829 use sqlparser::ast::Expr as SqlExpr;
2830
2831 if index_col.operator_class.is_some() {
2833 return Err(Error::InvalidArgumentError(format!(
2834 "{} operator classes are not supported",
2835 context
2836 )));
2837 }
2838
2839 let order_expr = &index_col.column;
2840
2841 if allow_sort_options {
2843 let ascending = order_expr.options.asc.unwrap_or(true);
2845 let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
2846
2847 if !ascending {
2848 return Err(Error::InvalidArgumentError(format!(
2849 "{} DESC ordering is not supported",
2850 context
2851 )));
2852 }
2853 if nulls_first {
2854 return Err(Error::InvalidArgumentError(format!(
2855 "{} NULLS FIRST ordering is not supported",
2856 context
2857 )));
2858 }
2859 } else {
2860 if order_expr.options.asc.is_some()
2862 || order_expr.options.nulls_first.is_some()
2863 || order_expr.with_fill.is_some()
2864 {
2865 return Err(Error::InvalidArgumentError(format!(
2866 "{} columns must be simple identifiers",
2867 context
2868 )));
2869 }
2870 }
2871
2872 let column_name = match &order_expr.expr {
2874 SqlExpr::Identifier(ident) => ident.value.clone(),
2875 SqlExpr::CompoundIdentifier(parts) => {
2876 if allow_compound {
2877 parts
2879 .last()
2880 .map(|ident| ident.value.clone())
2881 .ok_or_else(|| {
2882 Error::InvalidArgumentError(format!(
2883 "invalid column reference in {}",
2884 context
2885 ))
2886 })?
2887 } else if parts.len() == 1 {
2888 parts[0].value.clone()
2890 } else {
2891 return Err(Error::InvalidArgumentError(format!(
2892 "{} columns must be column identifiers",
2893 context
2894 )));
2895 }
2896 }
2897 other => {
2898 return Err(Error::InvalidArgumentError(format!(
2899 "{} only supports column references, found {:?}",
2900 context, other
2901 )));
2902 }
2903 };
2904
2905 Ok(column_name)
2906}
2907
2908fn validate_create_table_common(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
2909 if stmt.clone.is_some() || stmt.like.is_some() {
2910 return Err(Error::InvalidArgumentError(
2911 "CREATE TABLE LIKE/CLONE is not supported".into(),
2912 ));
2913 }
2914 if stmt.or_replace && stmt.if_not_exists {
2915 return Err(Error::InvalidArgumentError(
2916 "CREATE TABLE cannot combine OR REPLACE with IF NOT EXISTS".into(),
2917 ));
2918 }
2919 use sqlparser::ast::TableConstraint;
2920
2921 let mut seen_primary_key = false;
2922 for constraint in &stmt.constraints {
2923 match constraint {
2924 TableConstraint::PrimaryKey { .. } => {
2925 if seen_primary_key {
2926 return Err(Error::InvalidArgumentError(
2927 "multiple PRIMARY KEY constraints are not supported".into(),
2928 ));
2929 }
2930 seen_primary_key = true;
2931 }
2932 TableConstraint::Unique { .. } => {
2933 }
2935 TableConstraint::ForeignKey { .. } => {
2936 }
2938 other => {
2939 return Err(Error::InvalidArgumentError(format!(
2940 "table-level constraint {:?} is not supported",
2941 other
2942 )));
2943 }
2944 }
2945 }
2946 Ok(())
2947}
2948
2949fn validate_check_constraint(
2950 check_expr: &sqlparser::ast::Expr,
2951 table_name: &str,
2952 column_names: &[&str],
2953) -> SqlResult<()> {
2954 use sqlparser::ast::Expr as SqlExpr;
2955
2956 let column_names_lower: HashSet<String> = column_names
2957 .iter()
2958 .map(|name| name.to_ascii_lowercase())
2959 .collect();
2960
2961 let mut stack: Vec<&SqlExpr> = vec![check_expr];
2962
2963 while let Some(expr) = stack.pop() {
2964 match expr {
2965 SqlExpr::Subquery(_) => {
2966 return Err(Error::InvalidArgumentError(
2967 "Subqueries are not allowed in CHECK constraints".into(),
2968 ));
2969 }
2970 SqlExpr::Function(func) => {
2971 let func_name = func.name.to_string().to_uppercase();
2972 if matches!(func_name.as_str(), "SUM" | "AVG" | "COUNT" | "MIN" | "MAX") {
2973 return Err(Error::InvalidArgumentError(
2974 "Aggregate functions are not allowed in CHECK constraints".into(),
2975 ));
2976 }
2977
2978 if let sqlparser::ast::FunctionArguments::List(list) = &func.args {
2979 for arg in &list.args {
2980 if let sqlparser::ast::FunctionArg::Unnamed(
2981 sqlparser::ast::FunctionArgExpr::Expr(expr),
2982 ) = arg
2983 {
2984 stack.push(expr);
2985 }
2986 }
2987 }
2988 }
2989 SqlExpr::Identifier(ident) => {
2990 if !column_names_lower.contains(&ident.value.to_ascii_lowercase()) {
2991 return Err(Error::InvalidArgumentError(format!(
2992 "Column '{}' referenced in CHECK constraint does not exist",
2993 ident.value
2994 )));
2995 }
2996 }
2997 SqlExpr::CompoundIdentifier(idents) => {
2998 if idents.len() == 2 {
2999 let first = idents[0].value.as_str();
3000 let second = &idents[1].value;
3001
3002 if column_names_lower.contains(&first.to_ascii_lowercase()) {
3003 continue;
3004 }
3005
3006 if !first.eq_ignore_ascii_case(table_name) {
3007 return Err(Error::InvalidArgumentError(format!(
3008 "CHECK constraint references column from different table '{}'",
3009 first
3010 )));
3011 }
3012
3013 if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3014 return Err(Error::InvalidArgumentError(format!(
3015 "Column '{}' referenced in CHECK constraint does not exist",
3016 second
3017 )));
3018 }
3019 } else if idents.len() == 3 {
3020 let first = &idents[0].value;
3021 let second = &idents[1].value;
3022 let third = &idents[2].value;
3023
3024 if first.eq_ignore_ascii_case(table_name) {
3025 if !column_names_lower.contains(&second.to_ascii_lowercase()) {
3026 return Err(Error::InvalidArgumentError(format!(
3027 "Column '{}' referenced in CHECK constraint does not exist",
3028 second
3029 )));
3030 }
3031 } else if second.eq_ignore_ascii_case(table_name) {
3032 if !column_names_lower.contains(&third.to_ascii_lowercase()) {
3033 return Err(Error::InvalidArgumentError(format!(
3034 "Column '{}' referenced in CHECK constraint does not exist",
3035 third
3036 )));
3037 }
3038 } else {
3039 return Err(Error::InvalidArgumentError(format!(
3040 "CHECK constraint references column from different table '{}'",
3041 second
3042 )));
3043 }
3044 }
3045 }
3046 SqlExpr::BinaryOp { left, right, .. } => {
3047 stack.push(left);
3048 stack.push(right);
3049 }
3050 SqlExpr::UnaryOp { expr, .. } | SqlExpr::Nested(expr) => {
3051 stack.push(expr);
3052 }
3053 SqlExpr::Value(_) | SqlExpr::TypedString { .. } => {}
3054 _ => {}
3055 }
3056 }
3057
3058 Ok(())
3059}
3060
3061fn validate_create_table_definition(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3062 for column in &stmt.columns {
3063 for ColumnOptionDef { option, .. } in &column.options {
3064 match option {
3065 ColumnOption::Null
3066 | ColumnOption::NotNull
3067 | ColumnOption::Unique { .. }
3068 | ColumnOption::Check(_)
3069 | ColumnOption::ForeignKey { .. } => {}
3070 ColumnOption::Default(_) => {
3071 return Err(Error::InvalidArgumentError(format!(
3072 "DEFAULT values are not supported for column '{}'",
3073 column.name
3074 )));
3075 }
3076 other => {
3077 return Err(Error::InvalidArgumentError(format!(
3078 "unsupported column option {:?} on '{}'",
3079 other, column.name
3080 )));
3081 }
3082 }
3083 }
3084 }
3085 Ok(())
3086}
3087
3088fn validate_create_table_as(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
3089 if !stmt.columns.is_empty() {
3090 return Err(Error::InvalidArgumentError(
3091 "CREATE TABLE AS SELECT does not support column definitions yet".into(),
3092 ));
3093 }
3094 Ok(())
3095}
3096
3097fn validate_simple_query(query: &Query) -> SqlResult<()> {
3098 if query.with.is_some() {
3099 return Err(Error::InvalidArgumentError(
3100 "WITH clauses are not supported".into(),
3101 ));
3102 }
3103 if let Some(limit_clause) = &query.limit_clause {
3104 match limit_clause {
3105 LimitClause::LimitOffset {
3106 offset: Some(_), ..
3107 }
3108 | LimitClause::OffsetCommaLimit { .. } => {
3109 return Err(Error::InvalidArgumentError(
3110 "OFFSET clauses are not supported".into(),
3111 ));
3112 }
3113 LimitClause::LimitOffset { limit_by, .. } if !limit_by.is_empty() => {
3114 return Err(Error::InvalidArgumentError(
3115 "LIMIT BY clauses are not supported".into(),
3116 ));
3117 }
3118 _ => {}
3119 }
3120 }
3121 if query.fetch.is_some() {
3122 return Err(Error::InvalidArgumentError(
3123 "FETCH clauses are not supported".into(),
3124 ));
3125 }
3126 Ok(())
3127}
3128
3129fn resolve_column_name(expr: &SqlExpr) -> SqlResult<String> {
3130 match expr {
3131 SqlExpr::Identifier(ident) => Ok(ident.value.clone()),
3132 SqlExpr::CompoundIdentifier(parts) => {
3133 if let Some(last) = parts.last() {
3134 Ok(last.value.clone())
3135 } else {
3136 Err(Error::InvalidArgumentError(
3137 "empty column identifier".into(),
3138 ))
3139 }
3140 }
3141 _ => Err(Error::InvalidArgumentError(
3142 "aggregate arguments must be plain column identifiers".into(),
3143 )),
3144 }
3145}
3146
3147fn validate_projection_alias_qualifiers(
3148 projection_items: &[SelectItem],
3149 alias: &str,
3150) -> SqlResult<()> {
3151 let alias_lower = alias.to_ascii_lowercase();
3152 for item in projection_items {
3153 match item {
3154 SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
3155 if let SqlExpr::CompoundIdentifier(parts) = expr
3156 && parts.len() >= 2
3157 && let Some(first) = parts.first()
3158 && !first.value.eq_ignore_ascii_case(&alias_lower)
3159 {
3160 return Err(Error::InvalidArgumentError(format!(
3161 "Binder Error: table '{}' not found",
3162 first.value
3163 )));
3164 }
3165 }
3166 _ => {}
3167 }
3168 }
3169 Ok(())
3170}
3171
3172#[allow(dead_code)] fn expr_contains_aggregate(expr: &llkv_expr::expr::ScalarExpr<String>) -> bool {
3176 match expr {
3177 llkv_expr::expr::ScalarExpr::Aggregate(_) => true,
3178 llkv_expr::expr::ScalarExpr::Binary { left, right, .. } => {
3179 expr_contains_aggregate(left) || expr_contains_aggregate(right)
3180 }
3181 llkv_expr::expr::ScalarExpr::GetField { base, .. } => expr_contains_aggregate(base),
3182 llkv_expr::expr::ScalarExpr::Column(_) | llkv_expr::expr::ScalarExpr::Literal(_) => false,
3183 }
3184}
3185
3186fn try_parse_aggregate_function(
3187 func: &sqlparser::ast::Function,
3188) -> SqlResult<Option<llkv_expr::expr::AggregateCall<String>>> {
3189 use sqlparser::ast::{FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart};
3190
3191 if func.uses_odbc_syntax {
3192 return Ok(None);
3193 }
3194 if !matches!(func.parameters, FunctionArguments::None) {
3195 return Ok(None);
3196 }
3197 if func.filter.is_some()
3198 || func.null_treatment.is_some()
3199 || func.over.is_some()
3200 || !func.within_group.is_empty()
3201 {
3202 return Ok(None);
3203 }
3204
3205 let func_name = if func.name.0.len() == 1 {
3206 match &func.name.0[0] {
3207 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
3208 _ => return Ok(None),
3209 }
3210 } else {
3211 return Ok(None);
3212 };
3213
3214 let args_slice: &[FunctionArg] = match &func.args {
3215 FunctionArguments::List(list) => {
3216 if list.duplicate_treatment.is_some() || !list.clauses.is_empty() {
3217 return Ok(None);
3218 }
3219 &list.args
3220 }
3221 FunctionArguments::None => &[],
3222 FunctionArguments::Subquery(_) => return Ok(None),
3223 };
3224
3225 let agg_call = match func_name.as_str() {
3226 "count" => {
3227 if args_slice.len() != 1 {
3228 return Err(Error::InvalidArgumentError(
3229 "COUNT accepts exactly one argument".into(),
3230 ));
3231 }
3232 match &args_slice[0] {
3233 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
3234 llkv_expr::expr::AggregateCall::CountStar
3235 }
3236 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
3237 let column = resolve_column_name(arg_expr)?;
3238 llkv_expr::expr::AggregateCall::Count(column)
3239 }
3240 _ => {
3241 return Err(Error::InvalidArgumentError(
3242 "unsupported COUNT argument".into(),
3243 ));
3244 }
3245 }
3246 }
3247 "sum" => {
3248 if args_slice.len() != 1 {
3249 return Err(Error::InvalidArgumentError(
3250 "SUM accepts exactly one argument".into(),
3251 ));
3252 }
3253 let arg_expr = match &args_slice[0] {
3254 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3255 _ => {
3256 return Err(Error::InvalidArgumentError(
3257 "SUM requires a column argument".into(),
3258 ));
3259 }
3260 };
3261
3262 if let Some(column) = parse_count_nulls_case(arg_expr)? {
3264 llkv_expr::expr::AggregateCall::CountNulls(column)
3265 } else {
3266 let column = resolve_column_name(arg_expr)?;
3267 llkv_expr::expr::AggregateCall::Sum(column)
3268 }
3269 }
3270 "min" => {
3271 if args_slice.len() != 1 {
3272 return Err(Error::InvalidArgumentError(
3273 "MIN accepts exactly one argument".into(),
3274 ));
3275 }
3276 let arg_expr = match &args_slice[0] {
3277 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3278 _ => {
3279 return Err(Error::InvalidArgumentError(
3280 "MIN requires a column argument".into(),
3281 ));
3282 }
3283 };
3284 let column = resolve_column_name(arg_expr)?;
3285 llkv_expr::expr::AggregateCall::Min(column)
3286 }
3287 "max" => {
3288 if args_slice.len() != 1 {
3289 return Err(Error::InvalidArgumentError(
3290 "MAX accepts exactly one argument".into(),
3291 ));
3292 }
3293 let arg_expr = match &args_slice[0] {
3294 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
3295 _ => {
3296 return Err(Error::InvalidArgumentError(
3297 "MAX requires a column argument".into(),
3298 ));
3299 }
3300 };
3301 let column = resolve_column_name(arg_expr)?;
3302 llkv_expr::expr::AggregateCall::Max(column)
3303 }
3304 _ => return Ok(None),
3305 };
3306
3307 Ok(Some(agg_call))
3308}
3309
3310fn parse_count_nulls_case(expr: &SqlExpr) -> SqlResult<Option<String>> {
3311 let SqlExpr::Case {
3312 operand,
3313 conditions,
3314 else_result,
3315 ..
3316 } = expr
3317 else {
3318 return Ok(None);
3319 };
3320
3321 if operand.is_some() || conditions.len() != 1 {
3322 return Ok(None);
3323 }
3324
3325 let case_when = &conditions[0];
3326 if !is_integer_literal(&case_when.result, 1) {
3327 return Ok(None);
3328 }
3329
3330 let else_expr = match else_result {
3331 Some(expr) => expr.as_ref(),
3332 None => return Ok(None),
3333 };
3334 if !is_integer_literal(else_expr, 0) {
3335 return Ok(None);
3336 }
3337
3338 let inner = match &case_when.condition {
3339 SqlExpr::IsNull(inner) => inner.as_ref(),
3340 _ => return Ok(None),
3341 };
3342
3343 resolve_column_name(inner).map(Some)
3344}
3345
3346fn is_integer_literal(expr: &SqlExpr, expected: i64) -> bool {
3347 match expr {
3348 SqlExpr::Value(ValueWithSpan {
3349 value: Value::Number(text, _),
3350 ..
3351 }) => text.parse::<i64>() == Ok(expected),
3352 _ => false,
3353 }
3354}
3355
3356fn translate_condition_with_context(
3357 resolver: &IdentifierResolver<'_>,
3358 context: IdentifierContext,
3359 expr: &SqlExpr,
3360) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
3361 match expr {
3362 SqlExpr::IsNull(inner) => {
3363 let scalar = translate_scalar_with_context(resolver, context, inner)?;
3364 match scalar {
3365 llkv_expr::expr::ScalarExpr::Column(column) => {
3366 Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3367 field_id: column,
3368 op: llkv_expr::expr::Operator::IsNull,
3369 }))
3370 }
3371 _ => Err(Error::InvalidArgumentError(
3372 "IS NULL predicates currently support column references only".into(),
3373 )),
3374 }
3375 }
3376 SqlExpr::IsNotNull(inner) => {
3377 let scalar = translate_scalar_with_context(resolver, context, inner)?;
3378 match scalar {
3379 llkv_expr::expr::ScalarExpr::Column(column) => {
3380 Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3381 field_id: column,
3382 op: llkv_expr::expr::Operator::IsNotNull,
3383 }))
3384 }
3385 _ => Err(Error::InvalidArgumentError(
3386 "IS NOT NULL predicates currently support column references only".into(),
3387 )),
3388 }
3389 }
3390 SqlExpr::BinaryOp { left, op, right } => match op {
3391 BinaryOperator::And => Ok(llkv_expr::expr::Expr::And(vec![
3392 translate_condition_with_context(resolver, context, left)?,
3393 translate_condition_with_context(resolver, context, right)?,
3394 ])),
3395 BinaryOperator::Or => Ok(llkv_expr::expr::Expr::Or(vec![
3396 translate_condition_with_context(resolver, context, left)?,
3397 translate_condition_with_context(resolver, context, right)?,
3398 ])),
3399 BinaryOperator::Eq
3400 | BinaryOperator::NotEq
3401 | BinaryOperator::Lt
3402 | BinaryOperator::LtEq
3403 | BinaryOperator::Gt
3404 | BinaryOperator::GtEq => {
3405 translate_comparison_with_context(resolver, context, left, op.clone(), right)
3406 }
3407 other => Err(Error::InvalidArgumentError(format!(
3408 "unsupported binary operator in WHERE clause: {other:?}"
3409 ))),
3410 },
3411 SqlExpr::UnaryOp {
3412 op: UnaryOperator::Not,
3413 expr,
3414 } => Ok(llkv_expr::expr::Expr::not(
3415 translate_condition_with_context(resolver, context, expr)?,
3416 )),
3417 SqlExpr::Nested(inner) => translate_condition_with_context(resolver, context, inner),
3418 other => Err(Error::InvalidArgumentError(format!(
3419 "unsupported WHERE clause: {other:?}"
3420 ))),
3421 }
3422}
3423
3424fn translate_comparison_with_context(
3425 resolver: &IdentifierResolver<'_>,
3426 context: IdentifierContext,
3427 left: &SqlExpr,
3428 op: BinaryOperator,
3429 right: &SqlExpr,
3430) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
3431 let left_scalar = translate_scalar_with_context(resolver, context, left)?;
3432 let right_scalar = translate_scalar_with_context(resolver, context, right)?;
3433 let compare_op = match op {
3434 BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
3435 BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
3436 BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
3437 BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
3438 BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
3439 BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
3440 other => {
3441 return Err(Error::InvalidArgumentError(format!(
3442 "unsupported comparison operator: {other:?}"
3443 )));
3444 }
3445 };
3446
3447 if let (
3448 llkv_expr::expr::ScalarExpr::Column(column),
3449 llkv_expr::expr::ScalarExpr::Literal(literal),
3450 ) = (&left_scalar, &right_scalar)
3451 && let Some(op) = compare_op_to_filter_operator(compare_op, literal)
3452 {
3453 return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3454 field_id: column.clone(),
3455 op,
3456 }));
3457 }
3458
3459 if let (
3460 llkv_expr::expr::ScalarExpr::Literal(literal),
3461 llkv_expr::expr::ScalarExpr::Column(column),
3462 ) = (&left_scalar, &right_scalar)
3463 && let Some(flipped) = flip_compare_op(compare_op)
3464 && let Some(op) = compare_op_to_filter_operator(flipped, literal)
3465 {
3466 return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
3467 field_id: column.clone(),
3468 op,
3469 }));
3470 }
3471
3472 Ok(llkv_expr::expr::Expr::Compare {
3473 left: left_scalar,
3474 op: compare_op,
3475 right: right_scalar,
3476 })
3477}
3478
3479fn compare_op_to_filter_operator(
3480 op: llkv_expr::expr::CompareOp,
3481 literal: &Literal,
3482) -> Option<llkv_expr::expr::Operator<'static>> {
3483 let lit = literal.clone();
3484 match op {
3485 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::Operator::Equals(lit)),
3486 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::Operator::LessThan(lit)),
3487 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::Operator::LessThanOrEquals(lit)),
3488 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::Operator::GreaterThan(lit)),
3489 llkv_expr::expr::CompareOp::GtEq => {
3490 Some(llkv_expr::expr::Operator::GreaterThanOrEquals(lit))
3491 }
3492 llkv_expr::expr::CompareOp::NotEq => None,
3493 }
3494}
3495
3496fn flip_compare_op(op: llkv_expr::expr::CompareOp) -> Option<llkv_expr::expr::CompareOp> {
3497 match op {
3498 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::CompareOp::Eq),
3499 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::CompareOp::Gt),
3500 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::CompareOp::GtEq),
3501 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::CompareOp::Lt),
3502 llkv_expr::expr::CompareOp::GtEq => Some(llkv_expr::expr::CompareOp::LtEq),
3503 llkv_expr::expr::CompareOp::NotEq => None,
3504 }
3505}
3506fn translate_scalar_with_context(
3509 resolver: &IdentifierResolver<'_>,
3510 context: IdentifierContext,
3511 expr: &SqlExpr,
3512) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3513 match expr {
3514 SqlExpr::Identifier(ident) => {
3515 let parts = vec![ident.value.clone()];
3516 let resolution = resolver.resolve(&parts, context)?;
3517 Ok(resolution.into_scalar_expr())
3518 }
3519 SqlExpr::CompoundIdentifier(idents) => {
3520 if idents.is_empty() {
3521 return Err(Error::InvalidArgumentError(
3522 "invalid compound identifier".into(),
3523 ));
3524 }
3525
3526 let parts: Vec<String> = idents.iter().map(|ident| ident.value.clone()).collect();
3527 let resolution = resolver.resolve(&parts, context)?;
3528 Ok(resolution.into_scalar_expr())
3529 }
3530 _ => translate_scalar(expr),
3531 }
3532}
3533
3534fn translate_scalar(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3535 match expr {
3536 SqlExpr::Identifier(ident) => Ok(llkv_expr::expr::ScalarExpr::column(ident.value.clone())),
3537 SqlExpr::CompoundIdentifier(idents) => {
3538 if idents.is_empty() {
3539 return Err(Error::InvalidArgumentError(
3540 "invalid compound identifier".into(),
3541 ));
3542 }
3543
3544 let column_name = idents[0].value.clone();
3546 let mut result = llkv_expr::expr::ScalarExpr::column(column_name);
3547
3548 for part in &idents[1..] {
3549 let field_name = part.value.clone();
3550 result = llkv_expr::expr::ScalarExpr::get_field(result, field_name);
3551 }
3552
3553 Ok(result)
3554 }
3555 SqlExpr::Value(value) => literal_from_value(value),
3556 SqlExpr::BinaryOp { left, op, right } => {
3557 let left_expr = translate_scalar(left)?;
3558 let right_expr = translate_scalar(right)?;
3559 let op = match op {
3560 BinaryOperator::Plus => llkv_expr::expr::BinaryOp::Add,
3561 BinaryOperator::Minus => llkv_expr::expr::BinaryOp::Subtract,
3562 BinaryOperator::Multiply => llkv_expr::expr::BinaryOp::Multiply,
3563 BinaryOperator::Divide => llkv_expr::expr::BinaryOp::Divide,
3564 BinaryOperator::Modulo => llkv_expr::expr::BinaryOp::Modulo,
3565 other => {
3566 return Err(Error::InvalidArgumentError(format!(
3567 "unsupported scalar binary operator: {other:?}"
3568 )));
3569 }
3570 };
3571 Ok(llkv_expr::expr::ScalarExpr::binary(
3572 left_expr, op, right_expr,
3573 ))
3574 }
3575 SqlExpr::UnaryOp {
3576 op: UnaryOperator::Minus,
3577 expr,
3578 } => match translate_scalar(expr)? {
3579 llkv_expr::expr::ScalarExpr::Literal(lit) => match lit {
3580 Literal::Integer(v) => {
3581 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(-v)))
3582 }
3583 Literal::Float(v) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(-v))),
3584 Literal::Boolean(_) => Err(Error::InvalidArgumentError(
3585 "cannot negate boolean literal".into(),
3586 )),
3587 Literal::String(_) => Err(Error::InvalidArgumentError(
3588 "cannot negate string literal".into(),
3589 )),
3590 Literal::Struct(_) => Err(Error::InvalidArgumentError(
3591 "cannot negate struct literal".into(),
3592 )),
3593 Literal::Null => Err(Error::InvalidArgumentError(
3594 "cannot negate null literal".into(),
3595 )),
3596 },
3597 _ => Err(Error::InvalidArgumentError(
3598 "cannot negate non-literal expression".into(),
3599 )),
3600 },
3601 SqlExpr::UnaryOp {
3602 op: UnaryOperator::Plus,
3603 expr,
3604 } => translate_scalar(expr),
3605 SqlExpr::Nested(inner) => translate_scalar(inner),
3606 SqlExpr::Function(func) => {
3607 if let Some(agg_call) = try_parse_aggregate_function(func)? {
3609 Ok(llkv_expr::expr::ScalarExpr::aggregate(agg_call))
3610 } else {
3611 Err(Error::InvalidArgumentError(format!(
3612 "unsupported function in scalar expression: {:?}",
3613 func.name
3614 )))
3615 }
3616 }
3617 SqlExpr::Dictionary(fields) => {
3618 let mut struct_fields = Vec::new();
3620 for entry in fields {
3621 let key = entry.key.value.clone(); let value_expr = translate_scalar(&entry.value)?;
3624 match value_expr {
3626 llkv_expr::expr::ScalarExpr::Literal(lit) => {
3627 struct_fields.push((key, Box::new(lit)));
3628 }
3629 _ => {
3630 return Err(Error::InvalidArgumentError(
3631 "Dictionary values must be literals".to_string(),
3632 ));
3633 }
3634 }
3635 }
3636 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Struct(
3637 struct_fields,
3638 )))
3639 }
3640 other => Err(Error::InvalidArgumentError(format!(
3641 "unsupported scalar expression: {other:?}"
3642 ))),
3643 }
3644}
3645
3646fn literal_from_value(value: &ValueWithSpan) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
3647 match &value.value {
3648 Value::Number(text, _) => {
3649 if text.contains(['.', 'e', 'E']) {
3650 let parsed = text.parse::<f64>().map_err(|err| {
3651 Error::InvalidArgumentError(format!("invalid float literal: {err}"))
3652 })?;
3653 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(parsed)))
3654 } else {
3655 let parsed = text.parse::<i128>().map_err(|err| {
3656 Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
3657 })?;
3658 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(
3659 parsed,
3660 )))
3661 }
3662 }
3663 Value::Boolean(value) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Boolean(
3664 *value,
3665 ))),
3666 Value::Null => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Null)),
3667 other => {
3668 if let Some(text) = other.clone().into_string() {
3669 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::String(text)))
3670 } else {
3671 Err(Error::InvalidArgumentError(format!(
3672 "unsupported literal: {other:?}"
3673 )))
3674 }
3675 }
3676 }
3677}
3678
3679fn resolve_assignment_column_name(target: &AssignmentTarget) -> SqlResult<String> {
3680 match target {
3681 AssignmentTarget::ColumnName(name) => {
3682 if name.0.len() != 1 {
3683 return Err(Error::InvalidArgumentError(
3684 "qualified column names in UPDATE assignments are not supported yet".into(),
3685 ));
3686 }
3687 match &name.0[0] {
3688 ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
3689 other => Err(Error::InvalidArgumentError(format!(
3690 "unsupported column reference in UPDATE assignment: {other:?}"
3691 ))),
3692 }
3693 }
3694 AssignmentTarget::Tuple(_) => Err(Error::InvalidArgumentError(
3695 "tuple assignments are not supported yet".into(),
3696 )),
3697 }
3698}
3699
3700fn arrow_type_from_sql(data_type: &SqlDataType) -> SqlResult<arrow::datatypes::DataType> {
3701 use arrow::datatypes::DataType;
3702 match data_type {
3703 SqlDataType::Int(_)
3704 | SqlDataType::Integer(_)
3705 | SqlDataType::BigInt(_)
3706 | SqlDataType::SmallInt(_)
3707 | SqlDataType::TinyInt(_) => Ok(DataType::Int64),
3708 SqlDataType::Float(_)
3709 | SqlDataType::Real
3710 | SqlDataType::Double(_)
3711 | SqlDataType::DoublePrecision => Ok(DataType::Float64),
3712 SqlDataType::Text
3713 | SqlDataType::String(_)
3714 | SqlDataType::Varchar(_)
3715 | SqlDataType::Char(_)
3716 | SqlDataType::Uuid => Ok(DataType::Utf8),
3717 SqlDataType::Date => Ok(DataType::Date32),
3718 SqlDataType::Decimal(_) | SqlDataType::Numeric(_) => Ok(DataType::Float64),
3719 SqlDataType::Boolean => Ok(DataType::Boolean),
3720 SqlDataType::Custom(name, args) => {
3721 if name.0.len() == 1
3722 && let ObjectNamePart::Identifier(ident) = &name.0[0]
3723 && ident.value.eq_ignore_ascii_case("row")
3724 {
3725 return row_type_to_arrow(data_type, args);
3726 }
3727 Err(Error::InvalidArgumentError(format!(
3728 "unsupported SQL data type: {data_type:?}"
3729 )))
3730 }
3731 other => Err(Error::InvalidArgumentError(format!(
3732 "unsupported SQL data type: {other:?}"
3733 ))),
3734 }
3735}
3736
3737fn row_type_to_arrow(
3738 data_type: &SqlDataType,
3739 tokens: &[String],
3740) -> SqlResult<arrow::datatypes::DataType> {
3741 use arrow::datatypes::{DataType, Field, FieldRef, Fields};
3742
3743 let row_str = data_type.to_string();
3744 if tokens.is_empty() {
3745 return Err(Error::InvalidArgumentError(
3746 "ROW type must define at least one field".into(),
3747 ));
3748 }
3749
3750 let dialect = GenericDialect {};
3751 let field_definitions = resolve_row_field_types(tokens, &dialect).map_err(|err| {
3752 Error::InvalidArgumentError(format!("unable to parse ROW type '{row_str}': {err}"))
3753 })?;
3754
3755 let mut fields: Vec<FieldRef> = Vec::with_capacity(field_definitions.len());
3756 for (field_name, field_type) in field_definitions {
3757 let arrow_field_type = arrow_type_from_sql(&field_type)?;
3758 fields.push(Arc::new(Field::new(field_name, arrow_field_type, true)));
3759 }
3760
3761 let struct_fields: Fields = fields.into();
3762 Ok(DataType::Struct(struct_fields))
3763}
3764
3765fn resolve_row_field_types(
3766 tokens: &[String],
3767 dialect: &GenericDialect,
3768) -> SqlResult<Vec<(String, SqlDataType)>> {
3769 if tokens.is_empty() {
3770 return Err(Error::InvalidArgumentError(
3771 "ROW type must define at least one field".into(),
3772 ));
3773 }
3774
3775 let mut start = 0;
3776 let mut end = tokens.len();
3777 if tokens[start] == "(" {
3778 if end == 0 || tokens[end - 1] != ")" {
3779 return Err(Error::InvalidArgumentError(
3780 "ROW type is missing closing ')'".into(),
3781 ));
3782 }
3783 start += 1;
3784 end -= 1;
3785 } else if tokens[end - 1] == ")" {
3786 return Err(Error::InvalidArgumentError(
3787 "ROW type contains unmatched ')'".into(),
3788 ));
3789 }
3790
3791 let slice = &tokens[start..end];
3792 if slice.is_empty() {
3793 return Err(Error::InvalidArgumentError(
3794 "ROW type did not provide any field definitions".into(),
3795 ));
3796 }
3797
3798 let mut fields = Vec::new();
3799 let mut index = 0;
3800
3801 while index < slice.len() {
3802 if slice[index] == "," {
3803 index += 1;
3804 continue;
3805 }
3806
3807 let field_name = normalize_row_field_name(&slice[index])?;
3808 index += 1;
3809
3810 if index >= slice.len() {
3811 return Err(Error::InvalidArgumentError(format!(
3812 "ROW field '{field_name}' is missing a type specification"
3813 )));
3814 }
3815
3816 let mut last_success: Option<(usize, SqlDataType)> = None;
3817 let mut type_end = index;
3818
3819 while type_end <= slice.len() {
3820 let candidate = slice[index..type_end].join(" ");
3821 if candidate.trim().is_empty() {
3822 type_end += 1;
3823 continue;
3824 }
3825
3826 if let Ok(parsed_type) = parse_sql_data_type(&candidate, dialect) {
3827 last_success = Some((type_end, parsed_type));
3828 }
3829
3830 if type_end == slice.len() {
3831 break;
3832 }
3833
3834 if slice[type_end] == "," && last_success.is_some() {
3835 break;
3836 }
3837
3838 type_end += 1;
3839 }
3840
3841 let Some((next_index, data_type)) = last_success else {
3842 return Err(Error::InvalidArgumentError(format!(
3843 "failed to parse ROW field type for '{field_name}'"
3844 )));
3845 };
3846
3847 fields.push((field_name, data_type));
3848 index = next_index;
3849
3850 if index < slice.len() && slice[index] == "," {
3851 index += 1;
3852 }
3853 }
3854
3855 if fields.is_empty() {
3856 return Err(Error::InvalidArgumentError(
3857 "ROW type did not provide any field definitions".into(),
3858 ));
3859 }
3860
3861 Ok(fields)
3862}
3863
3864fn normalize_row_field_name(raw: &str) -> SqlResult<String> {
3865 let trimmed = raw.trim();
3866 if trimmed.is_empty() {
3867 return Err(Error::InvalidArgumentError(
3868 "ROW field name must not be empty".into(),
3869 ));
3870 }
3871
3872 if let Some(stripped) = trimmed.strip_prefix('"') {
3873 let without_end = stripped.strip_suffix('"').ok_or_else(|| {
3874 Error::InvalidArgumentError(format!("unterminated quoted ROW field name: {trimmed}"))
3875 })?;
3876 let name = without_end.replace("\"\"", "\"");
3877 return Ok(name);
3878 }
3879
3880 Ok(trimmed.to_string())
3881}
3882
3883fn parse_sql_data_type(type_str: &str, dialect: &GenericDialect) -> SqlResult<SqlDataType> {
3884 let trimmed = type_str.trim();
3885 let sql = format!("CREATE TABLE __row(__field {trimmed});");
3886 let statements = Parser::parse_sql(dialect, &sql).map_err(|err| {
3887 Error::InvalidArgumentError(format!("failed to parse ROW field type '{trimmed}': {err}"))
3888 })?;
3889
3890 let stmt = statements.into_iter().next().ok_or_else(|| {
3891 Error::InvalidArgumentError(format!(
3892 "ROW field type '{trimmed}' did not produce a statement"
3893 ))
3894 })?;
3895
3896 match stmt {
3897 Statement::CreateTable(table) => table
3898 .columns
3899 .first()
3900 .map(|col| col.data_type.clone())
3901 .ok_or_else(|| {
3902 Error::InvalidArgumentError(format!(
3903 "ROW field type '{trimmed}' missing column definition"
3904 ))
3905 }),
3906 other => Err(Error::InvalidArgumentError(format!(
3907 "unexpected statement while parsing ROW field type: {other:?}"
3908 ))),
3909 }
3910}
3911
3912fn extract_constant_select_rows(select: &Select) -> SqlResult<Option<Vec<Vec<PlanValue>>>> {
3913 if !select.from.is_empty() {
3914 return Ok(None);
3915 }
3916
3917 if select.selection.is_some()
3918 || select.having.is_some()
3919 || !select.named_window.is_empty()
3920 || select.qualify.is_some()
3921 || select.distinct.is_some()
3922 || select.top.is_some()
3923 || select.into.is_some()
3924 || select.prewhere.is_some()
3925 || !select.lateral_views.is_empty()
3926 || select.value_table_mode.is_some()
3927 || !group_by_is_empty(&select.group_by)
3928 {
3929 return Err(Error::InvalidArgumentError(
3930 "constant SELECT statements do not support advanced clauses".into(),
3931 ));
3932 }
3933
3934 if select.projection.is_empty() {
3935 return Err(Error::InvalidArgumentError(
3936 "constant SELECT requires at least one projection".into(),
3937 ));
3938 }
3939
3940 let mut row: Vec<PlanValue> = Vec::with_capacity(select.projection.len());
3941 for item in &select.projection {
3942 let expr = match item {
3943 SelectItem::UnnamedExpr(expr) => expr,
3944 SelectItem::ExprWithAlias { expr, .. } => expr,
3945 other => {
3946 return Err(Error::InvalidArgumentError(format!(
3947 "unsupported projection in constant SELECT: {other:?}"
3948 )));
3949 }
3950 };
3951
3952 let value = SqlValue::try_from_expr(expr)?;
3953 row.push(PlanValue::from(value));
3954 }
3955
3956 Ok(Some(vec![row]))
3957}
3958
3959fn extract_single_table(from: &[TableWithJoins]) -> SqlResult<(String, String)> {
3960 if from.len() != 1 {
3961 return Err(Error::InvalidArgumentError(
3962 "queries over multiple tables are not supported yet".into(),
3963 ));
3964 }
3965 let item = &from[0];
3966 if !item.joins.is_empty() {
3967 return Err(Error::InvalidArgumentError(
3968 "JOIN clauses are not supported yet".into(),
3969 ));
3970 }
3971 match &item.relation {
3972 TableFactor::Table { name, .. } => canonical_object_name(name),
3973 _ => Err(Error::InvalidArgumentError(
3974 "queries require a plain table name".into(),
3975 )),
3976 }
3977}
3978
3979fn extract_tables(from: &[TableWithJoins]) -> SqlResult<Vec<llkv_plan::TableRef>> {
3981 let mut tables = Vec::new();
3982
3983 for item in from {
3984 if !item.joins.is_empty() {
3986 return Err(Error::InvalidArgumentError(
3987 "JOIN clauses are not supported yet".into(),
3988 ));
3989 }
3990
3991 match &item.relation {
3993 TableFactor::Table { name, .. } => {
3994 let (schema_opt, table) = parse_schema_qualified_name(name)?;
3995 let schema = schema_opt.unwrap_or_default();
3996 tables.push(llkv_plan::TableRef::new(schema, table));
3997 }
3998 _ => {
3999 return Err(Error::InvalidArgumentError(
4000 "queries require a plain table name".into(),
4001 ));
4002 }
4003 }
4004 }
4005
4006 Ok(tables)
4007}
4008
4009fn group_by_is_empty(expr: &GroupByExpr) -> bool {
4010 matches!(
4011 expr,
4012 GroupByExpr::Expressions(exprs, modifiers)
4013 if exprs.is_empty() && modifiers.is_empty()
4014 )
4015}
4016
4017#[cfg(test)]
4018mod tests {
4019 use super::*;
4020 use arrow::array::{Array, Int64Array, StringArray};
4021 use arrow::record_batch::RecordBatch;
4022 use llkv_storage::pager::MemPager;
4023
4024 fn extract_string_options(batches: &[RecordBatch]) -> Vec<Option<String>> {
4025 let mut values: Vec<Option<String>> = Vec::new();
4026 for batch in batches {
4027 let column = batch
4028 .column(0)
4029 .as_any()
4030 .downcast_ref::<StringArray>()
4031 .expect("string column");
4032 for idx in 0..column.len() {
4033 if column.is_null(idx) {
4034 values.push(None);
4035 } else {
4036 values.push(Some(column.value(idx).to_string()));
4037 }
4038 }
4039 }
4040 values
4041 }
4042
4043 #[test]
4044 fn create_insert_select_roundtrip() {
4045 let pager = Arc::new(MemPager::default());
4046 let engine = SqlEngine::new(pager);
4047
4048 let result = engine
4049 .execute("CREATE TABLE people (id INT NOT NULL, name TEXT NOT NULL)")
4050 .expect("create table");
4051 assert!(matches!(
4052 result[0],
4053 RuntimeStatementResult::CreateTable { .. }
4054 ));
4055
4056 let result = engine
4057 .execute("INSERT INTO people (id, name) VALUES (1, 'alice'), (2, 'bob')")
4058 .expect("insert rows");
4059 assert!(matches!(
4060 result[0],
4061 RuntimeStatementResult::Insert {
4062 rows_inserted: 2,
4063 ..
4064 }
4065 ));
4066
4067 let mut result = engine
4068 .execute("SELECT name FROM people WHERE id = 2")
4069 .expect("select rows");
4070 let select_result = result.remove(0);
4071 let batches = match select_result {
4072 RuntimeStatementResult::Select { execution, .. } => {
4073 execution.collect().expect("collect batches")
4074 }
4075 _ => panic!("expected select result"),
4076 };
4077 assert_eq!(batches.len(), 1);
4078 let column = batches[0]
4079 .column(0)
4080 .as_any()
4081 .downcast_ref::<StringArray>()
4082 .expect("string column");
4083 assert_eq!(column.len(), 1);
4084 assert_eq!(column.value(0), "bob");
4085 }
4086
4087 #[test]
4088 fn insert_select_constant_including_null() {
4089 let pager = Arc::new(MemPager::default());
4090 let engine = SqlEngine::new(pager);
4091
4092 engine
4093 .execute("CREATE TABLE integers(i INTEGER)")
4094 .expect("create table");
4095
4096 let result = engine
4097 .execute("INSERT INTO integers SELECT 42")
4098 .expect("insert literal");
4099 assert!(matches!(
4100 result[0],
4101 RuntimeStatementResult::Insert {
4102 rows_inserted: 1,
4103 ..
4104 }
4105 ));
4106
4107 let result = engine
4108 .execute("INSERT INTO integers SELECT CAST(NULL AS VARCHAR)")
4109 .expect("insert null literal");
4110 assert!(matches!(
4111 result[0],
4112 RuntimeStatementResult::Insert {
4113 rows_inserted: 1,
4114 ..
4115 }
4116 ));
4117
4118 let mut result = engine
4119 .execute("SELECT * FROM integers")
4120 .expect("select rows");
4121 let select_result = result.remove(0);
4122 let batches = match select_result {
4123 RuntimeStatementResult::Select { execution, .. } => {
4124 execution.collect().expect("collect batches")
4125 }
4126 _ => panic!("expected select result"),
4127 };
4128
4129 let mut values: Vec<Option<i64>> = Vec::new();
4130 for batch in &batches {
4131 let column = batch
4132 .column(0)
4133 .as_any()
4134 .downcast_ref::<Int64Array>()
4135 .expect("int column");
4136 for idx in 0..column.len() {
4137 if column.is_null(idx) {
4138 values.push(None);
4139 } else {
4140 values.push(Some(column.value(idx)));
4141 }
4142 }
4143 }
4144
4145 assert_eq!(values, vec![Some(42), None]);
4146 }
4147
4148 #[test]
4149 fn update_with_where_clause_filters_rows() {
4150 let pager = Arc::new(MemPager::default());
4151 let engine = SqlEngine::new(pager);
4152
4153 engine
4154 .execute("SET default_null_order='nulls_first'")
4155 .expect("set default null order");
4156
4157 engine
4158 .execute("CREATE TABLE strings(a VARCHAR)")
4159 .expect("create table");
4160
4161 engine
4162 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
4163 .expect("insert seed rows");
4164
4165 let result = engine
4166 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
4167 .expect("update rows");
4168 assert!(matches!(
4169 result[0],
4170 RuntimeStatementResult::Update {
4171 rows_updated: 1,
4172 ..
4173 }
4174 ));
4175
4176 let mut result = engine
4177 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4178 .expect("select rows");
4179 let select_result = result.remove(0);
4180 let batches = match select_result {
4181 RuntimeStatementResult::Select { execution, .. } => {
4182 execution.collect().expect("collect batches")
4183 }
4184 _ => panic!("expected select result"),
4185 };
4186
4187 let mut values: Vec<Option<String>> = Vec::new();
4188 for batch in &batches {
4189 let column = batch
4190 .column(0)
4191 .as_any()
4192 .downcast_ref::<StringArray>()
4193 .expect("string column");
4194 for idx in 0..column.len() {
4195 if column.is_null(idx) {
4196 values.push(None);
4197 } else {
4198 values.push(Some(column.value(idx).to_string()));
4199 }
4200 }
4201 }
4202
4203 values.sort_by(|a, b| match (a, b) {
4204 (None, None) => std::cmp::Ordering::Equal,
4205 (None, Some(_)) => std::cmp::Ordering::Less,
4206 (Some(_), None) => std::cmp::Ordering::Greater,
4207 (Some(av), Some(bv)) => {
4208 let a_val = av.parse::<i64>().unwrap_or_default();
4209 let b_val = bv.parse::<i64>().unwrap_or_default();
4210 a_val.cmp(&b_val)
4211 }
4212 });
4213
4214 assert_eq!(
4215 values,
4216 vec![None, Some("4".to_string()), Some("13".to_string())]
4217 );
4218 }
4219
4220 #[test]
4221 fn order_by_honors_configured_default_null_order() {
4222 let pager = Arc::new(MemPager::default());
4223 let engine = SqlEngine::new(pager);
4224
4225 engine
4226 .execute("CREATE TABLE strings(a VARCHAR)")
4227 .expect("create table");
4228 engine
4229 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
4230 .expect("insert values");
4231 engine
4232 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
4233 .expect("update value");
4234
4235 let mut result = engine
4236 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4237 .expect("select rows");
4238 let select_result = result.remove(0);
4239 let batches = match select_result {
4240 RuntimeStatementResult::Select { execution, .. } => {
4241 execution.collect().expect("collect batches")
4242 }
4243 _ => panic!("expected select result"),
4244 };
4245
4246 let values = extract_string_options(&batches);
4247 assert_eq!(
4248 values,
4249 vec![Some("4".to_string()), Some("13".to_string()), None]
4250 );
4251
4252 assert!(!engine.default_nulls_first_for_tests());
4253
4254 engine
4255 .execute("SET default_null_order='nulls_first'")
4256 .expect("set default null order");
4257
4258 assert!(engine.default_nulls_first_for_tests());
4259
4260 let mut result = engine
4261 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
4262 .expect("select rows");
4263 let select_result = result.remove(0);
4264 let batches = match select_result {
4265 RuntimeStatementResult::Select { execution, .. } => {
4266 execution.collect().expect("collect batches")
4267 }
4268 _ => panic!("expected select result"),
4269 };
4270
4271 let values = extract_string_options(&batches);
4272 assert_eq!(
4273 values,
4274 vec![None, Some("4".to_string()), Some("13".to_string())]
4275 );
4276 }
4277
4278 #[test]
4279 fn arrow_type_from_row_returns_struct_fields() {
4280 let dialect = GenericDialect {};
4281 let statements = Parser::parse_sql(
4282 &dialect,
4283 "CREATE TABLE row_types(payload ROW(a INTEGER, b VARCHAR));",
4284 )
4285 .expect("parse ROW type definition");
4286
4287 let data_type = match &statements[0] {
4288 Statement::CreateTable(stmt) => stmt.columns[0].data_type.clone(),
4289 other => panic!("unexpected statement: {other:?}"),
4290 };
4291
4292 let arrow_type = arrow_type_from_sql(&data_type).expect("convert ROW type");
4293 match arrow_type {
4294 arrow::datatypes::DataType::Struct(fields) => {
4295 assert_eq!(fields.len(), 2, "unexpected field count");
4296 assert_eq!(fields[0].name(), "a");
4297 assert_eq!(fields[1].name(), "b");
4298 assert_eq!(fields[0].data_type(), &arrow::datatypes::DataType::Int64);
4299 assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
4300 }
4301 other => panic!("expected struct type, got {other:?}"),
4302 }
4303 }
4304}