1use std::cell::RefCell;
2use std::collections::{HashMap, HashSet, VecDeque};
3use std::convert::TryFrom;
4use std::sync::{
5 Arc, OnceLock,
6 atomic::{AtomicBool, Ordering as AtomicOrdering},
7};
8
9use crate::SqlResult;
10use crate::SqlValue;
11use arrow::array::Array;
12use arrow::datatypes::DataType;
13use arrow::record_batch::RecordBatch;
14
15use llkv_executor::{SelectExecution, push_query_label};
16use llkv_expr::literal::Literal;
17use llkv_plan::validation::{
18 ensure_known_columns_case_insensitive, ensure_non_empty, ensure_unique_case_insensitive,
19};
20use llkv_plan::{SubqueryCorrelatedColumnTracker, SubqueryCorrelatedTracker, TransformFrame};
21use llkv_result::Error;
22use llkv_runtime::TEMPORARY_NAMESPACE_ID;
23use llkv_runtime::{
24 AggregateExpr, AssignmentValue, ColumnAssignment, CreateIndexPlan, CreateTablePlan,
25 CreateTableSource, DeletePlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertPlan,
26 InsertSource, MultiColumnUniqueSpec, OrderByPlan, OrderSortType, OrderTarget, PlanColumnSpec,
27 PlanStatement, PlanValue, RenameTablePlan, RuntimeContext, RuntimeEngine, RuntimeSession,
28 RuntimeStatementResult, SelectPlan, SelectProjection, TruncatePlan, UpdatePlan,
29 extract_rows_from_range,
30};
31use llkv_storage::pager::Pager;
32use llkv_table::CatalogDdl;
33use llkv_table::catalog::{ColumnResolution, IdentifierContext, IdentifierResolver};
34use regex::Regex;
35use simd_r_drive_entry_handle::EntryHandle;
36use sqlparser::ast::{
37 AlterColumnOperation, AlterTableOperation, Assignment, AssignmentTarget, BeginTransactionKind,
38 BinaryOperator, ColumnOption, ColumnOptionDef, ConstraintCharacteristics,
39 DataType as SqlDataType, Delete, Distinct, ExceptionWhen, Expr as SqlExpr, FromTable,
40 FunctionArg, FunctionArgExpr, FunctionArguments, GroupByExpr, Ident, JoinConstraint,
41 JoinOperator, LimitClause, NullsDistinctOption, ObjectName, ObjectNamePart, ObjectType,
42 OrderBy, OrderByKind, Query, ReferentialAction, SchemaName, Select, SelectItem,
43 SelectItemQualifiedWildcardKind, Set, SetExpr, SetQuantifier, SqlOption, Statement,
44 TableConstraint, TableFactor, TableObject, TableWithJoins, TransactionMode,
45 TransactionModifier, UnaryOperator, UpdateTableFromKind, Value, ValueWithSpan,
46};
47use sqlparser::dialect::GenericDialect;
48use sqlparser::parser::Parser;
49use sqlparser::tokenizer::Span;
50
51#[derive(Clone, Copy, Debug, PartialEq, Eq)]
52pub enum StatementExpectation {
53 Ok,
54 Error,
55 Count(u64),
56}
57
58thread_local! {
59 static PENDING_STATEMENT_EXPECTATIONS: RefCell<VecDeque<StatementExpectation>> = const {
60 RefCell::new(VecDeque::new())
61 };
62}
63
64pub fn register_statement_expectation(expectation: StatementExpectation) {
65 PENDING_STATEMENT_EXPECTATIONS.with(|queue| {
66 queue.borrow_mut().push_back(expectation);
67 });
68}
69
70pub fn clear_pending_statement_expectations() {
71 PENDING_STATEMENT_EXPECTATIONS.with(|queue| {
72 queue.borrow_mut().clear();
73 });
74}
75
76fn next_statement_expectation() -> StatementExpectation {
77 PENDING_STATEMENT_EXPECTATIONS
78 .with(|queue| queue.borrow_mut().pop_front())
79 .unwrap_or(StatementExpectation::Ok)
80}
81
82const PARSER_RECURSION_LIMIT: usize = 200;
90
91trait ScalarSubqueryResolver {
92 fn handle_scalar_subquery(
93 &mut self,
94 subquery: &Query,
95 resolver: &IdentifierResolver<'_>,
96 context: &IdentifierContext,
97 outer_scopes: &[IdentifierContext],
98 ) -> SqlResult<llkv_expr::expr::ScalarExpr<String>>;
99}
100
101trait SubqueryCorrelatedTrackerExt {
103 fn placeholder_for_resolution(
104 &mut self,
105 resolution: &llkv_table::catalog::ColumnResolution,
106 ) -> Option<String>;
107}
108
109impl SubqueryCorrelatedTrackerExt for SubqueryCorrelatedTracker<'_> {
110 fn placeholder_for_resolution(
111 &mut self,
112 resolution: &llkv_table::catalog::ColumnResolution,
113 ) -> Option<String> {
114 self.placeholder_for_column_path(resolution.column(), resolution.field_path())
115 }
116}
117
118trait SubqueryCorrelatedTrackerOptionExt {
121 fn reborrow(&mut self) -> Option<&mut SubqueryCorrelatedColumnTracker>;
122}
123
124impl SubqueryCorrelatedTrackerOptionExt for Option<&mut SubqueryCorrelatedColumnTracker> {
125 fn reborrow(&mut self) -> Option<&mut SubqueryCorrelatedColumnTracker> {
126 self.as_mut().map(|tracker| &mut **tracker)
127 }
128}
129
130const MAX_BUFFERED_INSERT_ROWS: usize = 8192;
180
181struct InsertBuffer {
187 table_name: String,
188 columns: Vec<String>,
189 total_rows: usize,
191 statement_row_counts: Vec<usize>,
193 rows: Vec<Vec<PlanValue>>,
195}
196
197impl InsertBuffer {
198 fn new(table_name: String, columns: Vec<String>, rows: Vec<Vec<PlanValue>>) -> Self {
199 let row_count = rows.len();
200 Self {
201 table_name,
202 columns,
203 total_rows: row_count,
204 statement_row_counts: vec![row_count],
205 rows,
206 }
207 }
208
209 fn can_accept(&self, table_name: &str, columns: &[String]) -> bool {
210 self.table_name == table_name && self.columns == columns
211 }
212
213 fn push_statement(&mut self, rows: Vec<Vec<PlanValue>>) {
214 let row_count = rows.len();
215 self.total_rows += row_count;
216 self.statement_row_counts.push(row_count);
217 self.rows.extend(rows);
218 }
219
220 fn should_flush(&self) -> bool {
221 self.total_rows >= MAX_BUFFERED_INSERT_ROWS
222 }
223}
224
225enum PreparedInsert {
232 Values {
233 table_name: String,
234 columns: Vec<String>,
235 rows: Vec<Vec<PlanValue>>,
236 },
237 Immediate(InsertPlan),
238}
239
240struct BufferedInsertResult<P>
243where
244 P: Pager<Blob = EntryHandle> + Send + Sync,
245{
246 flushed: Vec<RuntimeStatementResult<P>>,
247 current: Option<RuntimeStatementResult<P>>,
248}
249
250pub struct SqlEngine<P>
251where
252 P: Pager<Blob = EntryHandle> + Send + Sync,
253{
254 engine: RuntimeEngine<P>,
255 default_nulls_first: AtomicBool,
256 insert_buffer: RefCell<Option<InsertBuffer>>,
258 insert_buffering_enabled: AtomicBool,
265}
266
267const DROPPED_TABLE_TRANSACTION_ERR: &str = "another transaction has dropped this table";
268
269impl<P> Drop for SqlEngine<P>
270where
271 P: Pager<Blob = EntryHandle> + Send + Sync,
272{
273 fn drop(&mut self) {
274 if let Err(e) = self.flush_buffer_results() {
276 tracing::warn!("Failed to flush INSERT buffer on drop: {:?}", e);
277 }
278 }
279}
280
281impl<P> Clone for SqlEngine<P>
282where
283 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
284{
285 fn clone(&self) -> Self {
286 tracing::warn!(
287 "[SQL_ENGINE] SqlEngine::clone() called - will create new Engine with new session!"
288 );
289 Self {
291 engine: self.engine.clone(),
292 default_nulls_first: AtomicBool::new(
293 self.default_nulls_first.load(AtomicOrdering::Relaxed),
294 ),
295 insert_buffer: RefCell::new(None),
296 insert_buffering_enabled: AtomicBool::new(
297 self.insert_buffering_enabled.load(AtomicOrdering::Relaxed),
298 ),
299 }
300 }
301}
302
303#[allow(dead_code)]
304impl<P> SqlEngine<P>
305where
306 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
307{
308 fn from_runtime_engine(
309 engine: RuntimeEngine<P>,
310 default_nulls_first: bool,
311 insert_buffering_enabled: bool,
312 ) -> Self {
313 Self {
314 engine,
315 default_nulls_first: AtomicBool::new(default_nulls_first),
316 insert_buffer: RefCell::new(None),
317 insert_buffering_enabled: AtomicBool::new(insert_buffering_enabled),
318 }
319 }
320
321 fn map_table_error(table_name: &str, err: Error) -> Error {
322 match err {
323 Error::NotFound => Self::table_not_found_error(table_name),
324 Error::InvalidArgumentError(msg) if msg.contains("unknown table") => {
325 Self::table_not_found_error(table_name)
326 }
327 other => other,
328 }
329 }
330
331 fn table_not_found_error(table_name: &str) -> Error {
332 Error::CatalogError(format!(
333 "Catalog Error: Table '{table_name}' does not exist"
334 ))
335 }
336
337 fn is_table_missing_error(err: &Error) -> bool {
338 match err {
339 Error::NotFound => true,
340 Error::CatalogError(msg) => {
341 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
342 }
343 Error::InvalidArgumentError(msg) => {
344 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
345 }
346 _ => false,
347 }
348 }
349
350 fn execute_plan_statement(
351 &self,
352 statement: PlanStatement,
353 ) -> SqlResult<RuntimeStatementResult<P>> {
354 let table = llkv_runtime::statement_table_name(&statement).map(str::to_string);
355 self.engine.execute_statement(statement).map_err(|err| {
356 if let Some(table_name) = table {
357 Self::map_table_error(&table_name, err)
358 } else {
359 err
360 }
361 })
362 }
363
364 pub fn new(pager: Arc<P>) -> Self {
369 let engine = RuntimeEngine::new(pager);
370 Self::from_runtime_engine(engine, false, false)
371 }
372
373 fn preprocess_create_type_syntax(sql: &str) -> String {
381 static CREATE_TYPE_REGEX: OnceLock<Regex> = OnceLock::new();
382 static DROP_TYPE_REGEX: OnceLock<Regex> = OnceLock::new();
383
384 let create_re = CREATE_TYPE_REGEX.get_or_init(|| {
386 Regex::new(r"(?i)\bCREATE\s+TYPE\s+").expect("valid CREATE TYPE regex")
387 });
388
389 let drop_re = DROP_TYPE_REGEX
391 .get_or_init(|| Regex::new(r"(?i)\bDROP\s+TYPE\s+").expect("valid DROP TYPE regex"));
392
393 let sql = create_re.replace_all(sql, "CREATE DOMAIN ").to_string();
395
396 drop_re.replace_all(&sql, "DROP DOMAIN ").to_string()
398 }
399
400 fn preprocess_exclude_syntax(sql: &str) -> String {
401 static EXCLUDE_REGEX: OnceLock<Regex> = OnceLock::new();
402
403 let re = EXCLUDE_REGEX.get_or_init(|| {
406 Regex::new(
407 r"(?i)EXCLUDE\s*\(\s*([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)+)\s*\)",
408 )
409 .expect("valid EXCLUDE qualifier regex")
410 });
411
412 re.replace_all(sql, |caps: ®ex::Captures| {
413 let qualified_name = &caps[1];
414 format!("EXCLUDE (\"{}\")", qualified_name)
415 })
416 .to_string()
417 }
418
419 fn preprocess_trailing_commas_in_values(sql: &str) -> String {
422 static TRAILING_COMMA_REGEX: OnceLock<Regex> = OnceLock::new();
423
424 let re = TRAILING_COMMA_REGEX
427 .get_or_init(|| Regex::new(r",(\s*)\)").expect("valid trailing comma regex"));
428
429 re.replace_all(sql, "$1)").to_string()
430 }
431
432 pub(crate) fn context_arc(&self) -> Arc<RuntimeContext<P>> {
433 self.engine.context()
434 }
435
436 pub fn with_context(context: Arc<RuntimeContext<P>>, default_nulls_first: bool) -> Self {
438 Self::from_runtime_engine(
439 RuntimeEngine::from_context(context),
440 default_nulls_first,
441 false,
442 )
443 }
444
445 pub fn set_insert_buffering(&self, enabled: bool) -> SqlResult<()> {
457 if !enabled {
458 let _ = self.flush_buffer_results()?;
459 }
460 self.insert_buffering_enabled
461 .store(enabled, AtomicOrdering::Relaxed);
462 Ok(())
463 }
464
465 #[cfg(test)]
466 fn default_nulls_first_for_tests(&self) -> bool {
467 self.default_nulls_first.load(AtomicOrdering::Relaxed)
468 }
469
470 fn has_active_transaction(&self) -> bool {
471 self.engine.session().has_active_transaction()
472 }
473
474 pub fn session(&self) -> &RuntimeSession<P> {
476 self.engine.session()
477 }
478
479 pub fn execute(&self, sql: &str) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
492 tracing::trace!("DEBUG SQL execute: {}", sql);
493
494 let processed_sql = Self::preprocess_create_type_syntax(sql);
496 let processed_sql = Self::preprocess_exclude_syntax(&processed_sql);
497 let processed_sql = Self::preprocess_trailing_commas_in_values(&processed_sql);
498
499 let dialect = GenericDialect {};
500 let statements = parse_sql_with_recursion_limit(&dialect, &processed_sql)
501 .map_err(|err| Error::InvalidArgumentError(format!("failed to parse SQL: {err}")))?;
502
503 let mut results = Vec::with_capacity(statements.len());
504 for statement in statements.iter() {
505 let statement_expectation = next_statement_expectation();
506 match statement {
507 Statement::Insert(insert) => {
508 let mut outcome = self.buffer_insert(insert.clone(), statement_expectation)?;
509 if let Some(current) = outcome.current.take() {
510 results.push(current);
511 }
512 results.append(&mut outcome.flushed);
513 }
514 Statement::StartTransaction { .. }
515 | Statement::Commit { .. }
516 | Statement::Rollback { .. } => {
517 let mut flushed = self.flush_buffer_results()?;
519 let current = self.execute_statement(statement.clone())?;
520 results.push(current);
521 results.append(&mut flushed);
522 }
523 _ => {
524 let mut flushed = self.flush_buffer_results()?;
526 let current = self.execute_statement(statement.clone())?;
527 results.push(current);
528 results.append(&mut flushed);
529 }
530 }
531 }
532
533 Ok(results)
534 }
535
536 pub fn flush_pending_inserts(&self) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
542 self.flush_buffer_results()
543 }
544
545 fn buffer_insert(
552 &self,
553 insert: sqlparser::ast::Insert,
554 expectation: StatementExpectation,
555 ) -> SqlResult<BufferedInsertResult<P>> {
556 let execute_immediately = matches!(
561 expectation,
562 StatementExpectation::Error | StatementExpectation::Count(_)
563 );
564 if execute_immediately {
565 let flushed = self.flush_buffer_results()?;
566 let current = self.handle_insert(insert)?;
567 return Ok(BufferedInsertResult {
568 flushed,
569 current: Some(current),
570 });
571 }
572
573 if !self.insert_buffering_enabled.load(AtomicOrdering::Relaxed) {
577 let flushed = self.flush_buffer_results()?;
578 let current = self.handle_insert(insert)?;
579 return Ok(BufferedInsertResult {
580 flushed,
581 current: Some(current),
582 });
583 }
584
585 let prepared = self.prepare_insert(insert)?;
586 match prepared {
587 PreparedInsert::Values {
588 table_name,
589 columns,
590 rows,
591 } => {
592 let mut flushed = Vec::new();
593 let statement_rows = rows.len();
594 let mut buf = self.insert_buffer.borrow_mut();
595 match buf.as_mut() {
596 Some(buffer) if buffer.can_accept(&table_name, &columns) => {
597 buffer.push_statement(rows);
598 if buffer.should_flush() {
599 drop(buf);
600 flushed = self.flush_buffer_results()?;
601 return Ok(BufferedInsertResult {
602 flushed,
603 current: None,
604 });
605 }
606 Ok(BufferedInsertResult {
607 flushed,
608 current: Some(RuntimeStatementResult::Insert {
609 table_name,
610 rows_inserted: statement_rows,
611 }),
612 })
613 }
614 Some(_) => {
615 drop(buf);
616 flushed = self.flush_buffer_results()?;
617 let mut buf = self.insert_buffer.borrow_mut();
618 *buf = Some(InsertBuffer::new(table_name.clone(), columns, rows));
619 Ok(BufferedInsertResult {
620 flushed,
621 current: Some(RuntimeStatementResult::Insert {
622 table_name,
623 rows_inserted: statement_rows,
624 }),
625 })
626 }
627 None => {
628 *buf = Some(InsertBuffer::new(table_name.clone(), columns, rows));
629 Ok(BufferedInsertResult {
630 flushed,
631 current: Some(RuntimeStatementResult::Insert {
632 table_name,
633 rows_inserted: statement_rows,
634 }),
635 })
636 }
637 }
638 }
639 PreparedInsert::Immediate(plan) => {
640 let flushed = self.flush_buffer_results()?;
641 let executed = self.execute_plan_statement(PlanStatement::Insert(plan))?;
642 Ok(BufferedInsertResult {
643 flushed,
644 current: Some(executed),
645 })
646 }
647 }
648 }
649
650 fn flush_buffer_results(&self) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
652 let mut buf = self.insert_buffer.borrow_mut();
653 let buffer = match buf.take() {
654 Some(b) => b,
655 None => return Ok(Vec::new()),
656 };
657 drop(buf);
658
659 let InsertBuffer {
660 table_name,
661 columns,
662 total_rows,
663 statement_row_counts,
664 rows,
665 } = buffer;
666
667 if total_rows == 0 {
668 return Ok(Vec::new());
669 }
670
671 let plan = InsertPlan {
672 table: table_name.clone(),
673 columns,
674 source: InsertSource::Rows(rows),
675 };
676
677 let executed = self.execute_plan_statement(PlanStatement::Insert(plan))?;
678 let inserted = match executed {
679 RuntimeStatementResult::Insert { rows_inserted, .. } => {
680 if rows_inserted != total_rows {
681 tracing::warn!(
682 "Buffered INSERT row count mismatch: expected {}, runtime inserted {}",
683 total_rows,
684 rows_inserted
685 );
686 }
687 rows_inserted
688 }
689 other => {
690 return Err(Error::Internal(format!(
691 "expected Insert result when flushing buffer, got {other:?}"
692 )));
693 }
694 };
695
696 let mut per_statement = Vec::with_capacity(statement_row_counts.len());
697 let mut assigned = 0usize;
698 for rows in statement_row_counts {
699 assigned += rows;
700 per_statement.push(RuntimeStatementResult::Insert {
701 table_name: table_name.clone(),
702 rows_inserted: rows,
703 });
704 }
705
706 if inserted != assigned {
707 tracing::warn!(
708 "Buffered INSERT per-statement totals ({}) do not match runtime ({}).",
709 assigned,
710 inserted
711 );
712 }
713
714 Ok(per_statement)
715 }
716
717 fn prepare_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<PreparedInsert> {
731 let table_name_debug =
732 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
733 tracing::trace!(
734 "DEBUG SQL prepare_insert called for table={}",
735 table_name_debug
736 );
737
738 if !self.engine.session().has_active_transaction()
739 && self.is_table_marked_dropped(&table_name_debug)?
740 {
741 return Err(Error::TransactionContextError(
742 DROPPED_TABLE_TRANSACTION_ERR.into(),
743 ));
744 }
745 if stmt.replace_into || stmt.ignore || stmt.or.is_some() {
746 return Err(Error::InvalidArgumentError(
747 "non-standard INSERT forms are not supported".into(),
748 ));
749 }
750 if stmt.overwrite {
751 return Err(Error::InvalidArgumentError(
752 "INSERT OVERWRITE is not supported".into(),
753 ));
754 }
755 if !stmt.assignments.is_empty() {
756 return Err(Error::InvalidArgumentError(
757 "INSERT ... SET is not supported".into(),
758 ));
759 }
760 if stmt.partitioned.is_some() || !stmt.after_columns.is_empty() {
761 return Err(Error::InvalidArgumentError(
762 "partitioned INSERT is not supported".into(),
763 ));
764 }
765 if stmt.returning.is_some() {
766 return Err(Error::InvalidArgumentError(
767 "INSERT ... RETURNING is not supported".into(),
768 ));
769 }
770 if stmt.format_clause.is_some() || stmt.settings.is_some() {
771 return Err(Error::InvalidArgumentError(
772 "INSERT with FORMAT or SETTINGS is not supported".into(),
773 ));
774 }
775
776 let (display_name, _canonical_name) = match &stmt.table {
777 TableObject::TableName(name) => canonical_object_name(name)?,
778 _ => {
779 return Err(Error::InvalidArgumentError(
780 "INSERT requires a plain table name".into(),
781 ));
782 }
783 };
784
785 let columns: Vec<String> = stmt
786 .columns
787 .iter()
788 .map(|ident| ident.value.clone())
789 .collect();
790
791 let source_expr = stmt
792 .source
793 .as_ref()
794 .ok_or_else(|| Error::InvalidArgumentError("INSERT requires a VALUES clause".into()))?;
795 validate_simple_query(source_expr)?;
796
797 match source_expr.body.as_ref() {
798 SetExpr::Values(values) => {
799 if values.rows.is_empty() {
800 return Err(Error::InvalidArgumentError(
801 "INSERT VALUES list must contain at least one row".into(),
802 ));
803 }
804 let mut rows: Vec<Vec<PlanValue>> = Vec::with_capacity(values.rows.len());
805 for row in &values.rows {
806 let mut converted = Vec::with_capacity(row.len());
807 for expr in row {
808 converted.push(PlanValue::from(SqlValue::try_from_expr(expr)?));
809 }
810 rows.push(converted);
811 }
812 Ok(PreparedInsert::Values {
813 table_name: display_name,
814 columns,
815 rows,
816 })
817 }
818 SetExpr::Select(select) => {
819 if let Some(rows) = extract_constant_select_rows(select.as_ref())? {
820 return Ok(PreparedInsert::Values {
821 table_name: display_name,
822 columns,
823 rows,
824 });
825 }
826 if let Some(range_rows) = extract_rows_from_range(select.as_ref())? {
827 return Ok(PreparedInsert::Values {
828 table_name: display_name,
829 columns,
830 rows: range_rows.into_rows(),
831 });
832 }
833
834 let select_plan = self.build_select_plan((**source_expr).clone())?;
835 Ok(PreparedInsert::Immediate(InsertPlan {
836 table: display_name,
837 columns,
838 source: InsertSource::Select {
839 plan: Box::new(select_plan),
840 },
841 }))
842 }
843 _ => Err(Error::InvalidArgumentError(
844 "unsupported INSERT source".into(),
845 )),
846 }
847 }
848
849 pub fn sql(&self, sql: &str) -> SqlResult<Vec<RecordBatch>> {
886 let mut results = self.execute(sql)?;
887 if results.is_empty() {
888 return Err(Error::InvalidArgumentError(
889 "SqlEngine::sql expects a SELECT statement".into(),
890 ));
891 }
892
893 let primary = results.remove(0);
894
895 match primary {
896 RuntimeStatementResult::Select { execution, .. } => execution.collect(),
897 other => Err(Error::InvalidArgumentError(format!(
898 "SqlEngine::sql requires a SELECT statement, got {other:?}",
899 ))),
900 }
901 }
902
903 fn execute_statement(&self, statement: Statement) -> SqlResult<RuntimeStatementResult<P>> {
904 let statement_sql = statement.to_string();
905 let _query_label_guard = push_query_label(statement_sql.clone());
906 tracing::debug!("SQL execute_statement: {}", statement_sql.trim());
907 tracing::trace!(
908 "DEBUG SQL execute_statement: {:?}",
909 match &statement {
910 Statement::Insert(insert) =>
911 format!("Insert(table={:?})", Self::table_name_from_insert(insert)),
912 Statement::Query(_) => "Query".to_string(),
913 Statement::StartTransaction { .. } => "StartTransaction".to_string(),
914 Statement::Commit { .. } => "Commit".to_string(),
915 Statement::Rollback { .. } => "Rollback".to_string(),
916 Statement::CreateTable(_) => "CreateTable".to_string(),
917 Statement::Update { .. } => "Update".to_string(),
918 Statement::Delete(_) => "Delete".to_string(),
919 other => format!("Other({:?})", other),
920 }
921 );
922 match statement {
923 Statement::StartTransaction {
924 modes,
925 begin,
926 transaction,
927 modifier,
928 statements,
929 exception,
930 has_end_keyword,
931 } => self.handle_start_transaction(
932 modes,
933 begin,
934 transaction,
935 modifier,
936 statements,
937 exception,
938 has_end_keyword,
939 ),
940 Statement::Commit {
941 chain,
942 end,
943 modifier,
944 } => self.handle_commit(chain, end, modifier),
945 Statement::Rollback { chain, savepoint } => self.handle_rollback(chain, savepoint),
946 other => self.execute_statement_non_transactional(other),
947 }
948 }
949
950 fn execute_statement_non_transactional(
951 &self,
952 statement: Statement,
953 ) -> SqlResult<RuntimeStatementResult<P>> {
954 tracing::trace!("DEBUG SQL execute_statement_non_transactional called");
955 match statement {
956 Statement::CreateTable(stmt) => {
957 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateTable");
958 self.handle_create_table(stmt)
959 }
960 Statement::CreateIndex(stmt) => {
961 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateIndex");
962 self.handle_create_index(stmt)
963 }
964 Statement::CreateSchema {
965 schema_name,
966 if_not_exists,
967 with,
968 options,
969 default_collate_spec,
970 clone,
971 } => {
972 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateSchema");
973 self.handle_create_schema(
974 schema_name,
975 if_not_exists,
976 with,
977 options,
978 default_collate_spec,
979 clone,
980 )
981 }
982 Statement::CreateView {
983 name,
984 columns,
985 query,
986 materialized,
987 or_replace,
988 or_alter,
989 options,
990 cluster_by,
991 comment,
992 with_no_schema_binding,
993 if_not_exists,
994 temporary,
995 to,
996 params,
997 secure,
998 name_before_not_exists,
999 } => {
1000 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateView");
1001 self.handle_create_view(
1002 name,
1003 columns,
1004 query,
1005 materialized,
1006 or_replace,
1007 or_alter,
1008 options,
1009 cluster_by,
1010 comment,
1011 with_no_schema_binding,
1012 if_not_exists,
1013 temporary,
1014 to,
1015 params,
1016 secure,
1017 name_before_not_exists,
1018 )
1019 }
1020 Statement::CreateDomain(create_domain) => {
1021 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateDomain");
1022 self.handle_create_domain(create_domain)
1023 }
1024 Statement::DropDomain(drop_domain) => {
1025 tracing::trace!("DEBUG SQL execute_statement_non_transactional: DropDomain");
1026 self.handle_drop_domain(drop_domain)
1027 }
1028 Statement::Insert(stmt) => {
1029 let table_name =
1030 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
1031 tracing::trace!(
1032 "DEBUG SQL execute_statement_non_transactional: Insert(table={})",
1033 table_name
1034 );
1035 self.handle_insert(stmt)
1036 }
1037 Statement::Query(query) => {
1038 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Query");
1039 self.handle_query(*query)
1040 }
1041 Statement::Update {
1042 table,
1043 assignments,
1044 from,
1045 selection,
1046 returning,
1047 ..
1048 } => {
1049 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Update");
1050 self.handle_update(table, assignments, from, selection, returning)
1051 }
1052 Statement::Delete(delete) => {
1053 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Delete");
1054 self.handle_delete(delete)
1055 }
1056 Statement::Truncate {
1057 ref table_names,
1058 ref partitions,
1059 table,
1060 ref identity,
1061 cascade,
1062 ref on_cluster,
1063 } => {
1064 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Truncate");
1065 self.handle_truncate(
1066 table_names,
1067 partitions,
1068 table,
1069 identity,
1070 cascade,
1071 on_cluster,
1072 )
1073 }
1074 Statement::Drop {
1075 object_type,
1076 if_exists,
1077 names,
1078 cascade,
1079 restrict,
1080 purge,
1081 temporary,
1082 ..
1083 } => {
1084 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Drop");
1085 self.handle_drop(
1086 object_type,
1087 if_exists,
1088 names,
1089 cascade,
1090 restrict,
1091 purge,
1092 temporary,
1093 )
1094 }
1095 Statement::AlterTable {
1096 name,
1097 if_exists,
1098 only,
1099 operations,
1100 ..
1101 } => {
1102 tracing::trace!("DEBUG SQL execute_statement_non_transactional: AlterTable");
1103 self.handle_alter_table(name, if_exists, only, operations)
1104 }
1105 Statement::Set(set_stmt) => {
1106 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Set");
1107 self.handle_set(set_stmt)
1108 }
1109 Statement::Pragma { name, value, is_eq } => {
1110 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Pragma");
1111 self.handle_pragma(name, value, is_eq)
1112 }
1113 other => {
1114 tracing::trace!(
1115 "DEBUG SQL execute_statement_non_transactional: Other({:?})",
1116 other
1117 );
1118 Err(Error::InvalidArgumentError(format!(
1119 "unsupported SQL statement: {other:?}"
1120 )))
1121 }
1122 }
1123 }
1124
1125 fn table_name_from_insert(insert: &sqlparser::ast::Insert) -> SqlResult<String> {
1126 match &insert.table {
1127 TableObject::TableName(name) => Self::object_name_to_string(name),
1128 _ => Err(Error::InvalidArgumentError(
1129 "INSERT requires a plain table name".into(),
1130 )),
1131 }
1132 }
1133
1134 fn table_name_from_update(table: &TableWithJoins) -> SqlResult<Option<String>> {
1135 if !table.joins.is_empty() {
1136 return Err(Error::InvalidArgumentError(
1137 "UPDATE with JOIN targets is not supported yet".into(),
1138 ));
1139 }
1140 Self::table_with_joins_name(table)
1141 }
1142
1143 fn table_name_from_delete(delete: &Delete) -> SqlResult<Option<String>> {
1144 if !delete.tables.is_empty() {
1145 return Err(Error::InvalidArgumentError(
1146 "multi-table DELETE is not supported yet".into(),
1147 ));
1148 }
1149 let from_tables = match &delete.from {
1150 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
1151 };
1152 if from_tables.is_empty() {
1153 return Ok(None);
1154 }
1155 if from_tables.len() != 1 {
1156 return Err(Error::InvalidArgumentError(
1157 "DELETE over multiple tables is not supported yet".into(),
1158 ));
1159 }
1160 Self::table_with_joins_name(&from_tables[0])
1161 }
1162
1163 fn object_name_to_string(name: &ObjectName) -> SqlResult<String> {
1164 let (display, _) = canonical_object_name(name)?;
1165 Ok(display)
1166 }
1167
1168 #[allow(dead_code)]
1169 fn table_object_to_name(table: &TableObject) -> SqlResult<Option<String>> {
1170 match table {
1171 TableObject::TableName(name) => Ok(Some(Self::object_name_to_string(name)?)),
1172 TableObject::TableFunction(_) => Ok(None),
1173 }
1174 }
1175
1176 fn table_with_joins_name(table: &TableWithJoins) -> SqlResult<Option<String>> {
1177 match &table.relation {
1178 TableFactor::Table { name, .. } => Ok(Some(Self::object_name_to_string(name)?)),
1179 _ => Ok(None),
1180 }
1181 }
1182
1183 fn tables_in_query(query: &Query) -> SqlResult<Vec<String>> {
1184 let mut tables = Vec::new();
1185 if let sqlparser::ast::SetExpr::Select(select) = query.body.as_ref() {
1186 for table in &select.from {
1187 if let TableFactor::Table { name, .. } = &table.relation {
1188 tables.push(Self::object_name_to_string(name)?);
1189 }
1190 }
1191 }
1192 Ok(tables)
1193 }
1194
1195 fn collect_known_columns(
1196 &self,
1197 display_name: &str,
1198 canonical_name: &str,
1199 ) -> SqlResult<HashSet<String>> {
1200 let context = self.engine.context();
1201
1202 if context.is_table_marked_dropped(canonical_name) {
1203 return Err(Self::table_not_found_error(display_name));
1204 }
1205
1206 if let Some(specs) = self
1208 .engine
1209 .session()
1210 .table_column_specs_from_transaction(canonical_name)
1211 {
1212 return Ok(specs
1213 .into_iter()
1214 .map(|spec| spec.name.to_ascii_lowercase())
1215 .collect());
1216 }
1217
1218 let (_, canonical_name) = llkv_table::canonical_table_name(display_name)
1220 .map_err(|e| arrow::error::ArrowError::ExternalError(Box::new(e)))?;
1221 match context.catalog().table_column_specs(&canonical_name) {
1222 Ok(specs) => Ok(specs
1223 .into_iter()
1224 .map(|spec| spec.name.to_ascii_lowercase())
1225 .collect()),
1226 Err(err) => {
1227 if !Self::is_table_missing_error(&err) {
1228 return Err(Self::map_table_error(display_name, err));
1229 }
1230
1231 Ok(HashSet::new())
1232 }
1233 }
1234 }
1235
1236 fn is_table_marked_dropped(&self, table_name: &str) -> SqlResult<bool> {
1237 let canonical = table_name.to_ascii_lowercase();
1238 Ok(self.engine.context().is_table_marked_dropped(&canonical))
1239 }
1240
1241 fn handle_create_table(
1242 &self,
1243 mut stmt: sqlparser::ast::CreateTable,
1244 ) -> SqlResult<RuntimeStatementResult<P>> {
1245 validate_create_table_common(&stmt)?;
1246
1247 let (mut schema_name, table_name) = parse_schema_qualified_name(&stmt.name)?;
1248
1249 let namespace = if stmt.temporary {
1250 if schema_name.is_some() {
1251 return Err(Error::InvalidArgumentError(
1252 "temporary tables cannot specify an explicit schema".into(),
1253 ));
1254 }
1255 schema_name = None;
1256 Some(TEMPORARY_NAMESPACE_ID.to_string())
1257 } else {
1258 None
1259 };
1260
1261 if let Some(ref schema) = schema_name {
1263 let catalog = self.engine.context().table_catalog();
1264 if !catalog.schema_exists(schema) {
1265 return Err(Error::CatalogError(format!(
1266 "Schema '{}' does not exist",
1267 schema
1268 )));
1269 }
1270 }
1271
1272 let display_name = match &schema_name {
1274 Some(schema) => format!("{}.{}", schema, table_name),
1275 None => table_name.clone(),
1276 };
1277 let canonical_name = display_name.to_ascii_lowercase();
1278 tracing::trace!(
1279 "\n=== HANDLE_CREATE_TABLE: table='{}' columns={} ===",
1280 display_name,
1281 stmt.columns.len()
1282 );
1283 if display_name.is_empty() {
1284 return Err(Error::InvalidArgumentError(
1285 "table name must not be empty".into(),
1286 ));
1287 }
1288
1289 if let Some(query) = stmt.query.take() {
1290 validate_create_table_as(&stmt)?;
1291 if let Some(result) = self.try_handle_range_ctas(
1292 &display_name,
1293 &canonical_name,
1294 &query,
1295 stmt.if_not_exists,
1296 stmt.or_replace,
1297 namespace.clone(),
1298 )? {
1299 return Ok(result);
1300 }
1301 return self.handle_create_table_as(
1302 display_name,
1303 canonical_name,
1304 *query,
1305 stmt.if_not_exists,
1306 stmt.or_replace,
1307 namespace.clone(),
1308 );
1309 }
1310
1311 if stmt.columns.is_empty() {
1312 return Err(Error::InvalidArgumentError(
1313 "CREATE TABLE requires at least one column".into(),
1314 ));
1315 }
1316
1317 validate_create_table_definition(&stmt)?;
1318
1319 let column_defs_ast = std::mem::take(&mut stmt.columns);
1320 let constraints = std::mem::take(&mut stmt.constraints);
1321
1322 let column_names: Vec<String> = column_defs_ast
1323 .iter()
1324 .map(|column_def| column_def.name.value.clone())
1325 .collect();
1326 ensure_unique_case_insensitive(column_names.iter().map(|name| name.as_str()), |dup| {
1327 format!(
1328 "duplicate column name '{}' in table '{}'",
1329 dup, display_name
1330 )
1331 })?;
1332 let column_names_lower: HashSet<String> = column_names
1333 .iter()
1334 .map(|name| name.to_ascii_lowercase())
1335 .collect();
1336
1337 let mut columns: Vec<PlanColumnSpec> = Vec::with_capacity(column_defs_ast.len());
1338 let mut primary_key_columns: HashSet<String> = HashSet::new();
1339 let mut foreign_keys: Vec<ForeignKeySpec> = Vec::new();
1340 let mut multi_column_uniques: Vec<MultiColumnUniqueSpec> = Vec::new();
1341
1342 for column_def in column_defs_ast {
1344 let is_nullable = column_def
1345 .options
1346 .iter()
1347 .all(|opt| !matches!(opt.option, ColumnOption::NotNull));
1348
1349 let is_primary_key = column_def.options.iter().any(|opt| {
1350 matches!(
1351 opt.option,
1352 ColumnOption::Unique {
1353 is_primary: true,
1354 characteristics: _
1355 }
1356 )
1357 });
1358
1359 let has_unique_constraint = column_def
1360 .options
1361 .iter()
1362 .any(|opt| matches!(opt.option, ColumnOption::Unique { .. }));
1363
1364 let check_expr = column_def.options.iter().find_map(|opt| {
1366 if let ColumnOption::Check(expr) = &opt.option {
1367 Some(expr)
1368 } else {
1369 None
1370 }
1371 });
1372
1373 if let Some(check_expr) = check_expr {
1375 let all_col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
1376 validate_check_constraint(check_expr, &display_name, &all_col_refs)?;
1377 }
1378
1379 let check_expr_str = check_expr.map(|e| e.to_string());
1380
1381 for opt in &column_def.options {
1383 if let ColumnOption::ForeignKey {
1384 foreign_table,
1385 referred_columns,
1386 on_delete,
1387 on_update,
1388 characteristics,
1389 } = &opt.option
1390 {
1391 let spec = self.build_foreign_key_spec(
1392 &display_name,
1393 &canonical_name,
1394 vec![column_def.name.value.clone()],
1395 foreign_table,
1396 referred_columns,
1397 *on_delete,
1398 *on_update,
1399 characteristics,
1400 &column_names_lower,
1401 None,
1402 )?;
1403 foreign_keys.push(spec);
1404 }
1405 }
1406
1407 tracing::trace!(
1408 "DEBUG CREATE TABLE column '{}' is_primary_key={} has_unique={} check_expr={:?}",
1409 column_def.name.value,
1410 is_primary_key,
1411 has_unique_constraint,
1412 check_expr_str
1413 );
1414
1415 let resolved_data_type = self.engine.context().resolve_type(&column_def.data_type);
1417
1418 let mut column = PlanColumnSpec::new(
1419 column_def.name.value.clone(),
1420 arrow_type_from_sql(&resolved_data_type)?,
1421 is_nullable,
1422 );
1423 tracing::trace!(
1424 "DEBUG PlanColumnSpec after new(): primary_key={} unique={}",
1425 column.primary_key,
1426 column.unique
1427 );
1428
1429 column = column
1430 .with_primary_key(is_primary_key)
1431 .with_unique(has_unique_constraint)
1432 .with_check(check_expr_str);
1433
1434 if is_primary_key {
1435 column.nullable = false;
1436 primary_key_columns.insert(column.name.to_ascii_lowercase());
1437 }
1438 tracing::trace!(
1439 "DEBUG PlanColumnSpec after with_primary_key({})/with_unique({}): primary_key={} unique={} check_expr={:?}",
1440 is_primary_key,
1441 has_unique_constraint,
1442 column.primary_key,
1443 column.unique,
1444 column.check_expr
1445 );
1446
1447 columns.push(column);
1448 }
1449
1450 if !constraints.is_empty() {
1452 let mut column_lookup: HashMap<String, usize> = HashMap::with_capacity(columns.len());
1453 for (idx, column) in columns.iter().enumerate() {
1454 column_lookup.insert(column.name.to_ascii_lowercase(), idx);
1455 }
1456
1457 for constraint in constraints {
1458 match constraint {
1459 TableConstraint::PrimaryKey {
1460 columns: constraint_columns,
1461 ..
1462 } => {
1463 if !primary_key_columns.is_empty() {
1464 return Err(Error::InvalidArgumentError(
1465 "multiple PRIMARY KEY constraints are not supported".into(),
1466 ));
1467 }
1468
1469 ensure_non_empty(&constraint_columns, || {
1470 "PRIMARY KEY requires at least one column".into()
1471 })?;
1472
1473 let mut pk_column_names: Vec<String> =
1474 Vec::with_capacity(constraint_columns.len());
1475
1476 for index_col in &constraint_columns {
1477 let column_ident = extract_index_column_name(
1478 index_col,
1479 "PRIMARY KEY",
1480 false, false, )?;
1483 pk_column_names.push(column_ident);
1484 }
1485
1486 ensure_unique_case_insensitive(
1487 pk_column_names.iter().map(|name| name.as_str()),
1488 |dup| format!("duplicate column '{}' in PRIMARY KEY constraint", dup),
1489 )?;
1490
1491 ensure_known_columns_case_insensitive(
1492 pk_column_names.iter().map(|name| name.as_str()),
1493 &column_names_lower,
1494 |unknown| {
1495 format!("unknown column '{}' in PRIMARY KEY constraint", unknown)
1496 },
1497 )?;
1498
1499 for column_ident in pk_column_names {
1500 let normalized = column_ident.to_ascii_lowercase();
1501 let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
1502 Error::InvalidArgumentError(format!(
1503 "unknown column '{}' in PRIMARY KEY constraint",
1504 column_ident
1505 ))
1506 })?;
1507
1508 let column = columns.get_mut(idx).expect("column index valid");
1509 column.primary_key = true;
1510 column.unique = true;
1511 column.nullable = false;
1512
1513 primary_key_columns.insert(normalized);
1514 }
1515 }
1516 TableConstraint::Unique {
1517 columns: constraint_columns,
1518 index_type,
1519 index_options,
1520 characteristics,
1521 nulls_distinct,
1522 name,
1523 ..
1524 } => {
1525 if !matches!(nulls_distinct, NullsDistinctOption::None) {
1526 return Err(Error::InvalidArgumentError(
1527 "UNIQUE constraints with NULLS DISTINCT/NOT DISTINCT are not supported yet".into(),
1528 ));
1529 }
1530
1531 if index_type.is_some() {
1532 return Err(Error::InvalidArgumentError(
1533 "UNIQUE constraints with index types are not supported yet".into(),
1534 ));
1535 }
1536
1537 if !index_options.is_empty() {
1538 return Err(Error::InvalidArgumentError(
1539 "UNIQUE constraints with index options are not supported yet"
1540 .into(),
1541 ));
1542 }
1543
1544 if characteristics.is_some() {
1545 return Err(Error::InvalidArgumentError(
1546 "UNIQUE constraint characteristics are not supported yet".into(),
1547 ));
1548 }
1549
1550 ensure_non_empty(&constraint_columns, || {
1551 "UNIQUE constraint requires at least one column".into()
1552 })?;
1553
1554 let mut unique_column_names: Vec<String> =
1555 Vec::with_capacity(constraint_columns.len());
1556
1557 for index_column in &constraint_columns {
1558 let column_ident = extract_index_column_name(
1559 index_column,
1560 "UNIQUE constraint",
1561 false, false, )?;
1564 unique_column_names.push(column_ident);
1565 }
1566
1567 ensure_unique_case_insensitive(
1568 unique_column_names.iter().map(|name| name.as_str()),
1569 |dup| format!("duplicate column '{}' in UNIQUE constraint", dup),
1570 )?;
1571
1572 ensure_known_columns_case_insensitive(
1573 unique_column_names.iter().map(|name| name.as_str()),
1574 &column_names_lower,
1575 |unknown| format!("unknown column '{}' in UNIQUE constraint", unknown),
1576 )?;
1577
1578 if unique_column_names.len() > 1 {
1579 multi_column_uniques.push(MultiColumnUniqueSpec {
1581 name: name.map(|n| n.value),
1582 columns: unique_column_names,
1583 });
1584 } else {
1585 let column_ident = unique_column_names
1587 .into_iter()
1588 .next()
1589 .expect("unique constraint checked for emptiness");
1590 let normalized = column_ident.to_ascii_lowercase();
1591 let idx = column_lookup.get(&normalized).copied().ok_or_else(|| {
1592 Error::InvalidArgumentError(format!(
1593 "unknown column '{}' in UNIQUE constraint",
1594 column_ident
1595 ))
1596 })?;
1597
1598 let column = columns
1599 .get_mut(idx)
1600 .expect("column index from lookup must be valid");
1601 column.unique = true;
1602 }
1603 }
1604 TableConstraint::ForeignKey {
1605 name,
1606 index_name,
1607 columns: fk_columns,
1608 foreign_table,
1609 referred_columns,
1610 on_delete,
1611 on_update,
1612 characteristics,
1613 ..
1614 } => {
1615 if index_name.is_some() {
1616 return Err(Error::InvalidArgumentError(
1617 "FOREIGN KEY index clauses are not supported yet".into(),
1618 ));
1619 }
1620
1621 let referencing_columns: Vec<String> =
1622 fk_columns.into_iter().map(|ident| ident.value).collect();
1623 let spec = self.build_foreign_key_spec(
1624 &display_name,
1625 &canonical_name,
1626 referencing_columns,
1627 &foreign_table,
1628 &referred_columns,
1629 on_delete,
1630 on_update,
1631 &characteristics,
1632 &column_names_lower,
1633 name.map(|ident| ident.value),
1634 )?;
1635
1636 foreign_keys.push(spec);
1637 }
1638 unsupported => {
1639 return Err(Error::InvalidArgumentError(format!(
1640 "table-level constraint {:?} is not supported",
1641 unsupported
1642 )));
1643 }
1644 }
1645 }
1646 }
1647
1648 let plan = CreateTablePlan {
1649 name: display_name,
1650 if_not_exists: stmt.if_not_exists,
1651 or_replace: stmt.or_replace,
1652 columns,
1653 source: None,
1654 namespace,
1655 foreign_keys,
1656 multi_column_uniques,
1657 };
1658 self.execute_plan_statement(PlanStatement::CreateTable(plan))
1659 }
1660
1661 fn handle_create_index(
1662 &self,
1663 stmt: sqlparser::ast::CreateIndex,
1664 ) -> SqlResult<RuntimeStatementResult<P>> {
1665 let sqlparser::ast::CreateIndex {
1666 name,
1667 table_name,
1668 using,
1669 columns,
1670 unique,
1671 concurrently,
1672 if_not_exists,
1673 include,
1674 nulls_distinct,
1675 with,
1676 predicate,
1677 index_options,
1678 alter_options,
1679 ..
1680 } = stmt;
1681
1682 if concurrently {
1683 return Err(Error::InvalidArgumentError(
1684 "CREATE INDEX CONCURRENTLY is not supported".into(),
1685 ));
1686 }
1687 if using.is_some() {
1688 return Err(Error::InvalidArgumentError(
1689 "CREATE INDEX USING clauses are not supported".into(),
1690 ));
1691 }
1692 if !include.is_empty() {
1693 return Err(Error::InvalidArgumentError(
1694 "CREATE INDEX INCLUDE columns are not supported".into(),
1695 ));
1696 }
1697 if nulls_distinct.is_some() {
1698 return Err(Error::InvalidArgumentError(
1699 "CREATE INDEX NULLS DISTINCT is not supported".into(),
1700 ));
1701 }
1702 if !with.is_empty() {
1703 return Err(Error::InvalidArgumentError(
1704 "CREATE INDEX WITH options are not supported".into(),
1705 ));
1706 }
1707 if predicate.is_some() {
1708 return Err(Error::InvalidArgumentError(
1709 "partial CREATE INDEX is not supported".into(),
1710 ));
1711 }
1712 if !index_options.is_empty() {
1713 return Err(Error::InvalidArgumentError(
1714 "CREATE INDEX options are not supported".into(),
1715 ));
1716 }
1717 if !alter_options.is_empty() {
1718 return Err(Error::InvalidArgumentError(
1719 "CREATE INDEX ALTER options are not supported".into(),
1720 ));
1721 }
1722 if columns.is_empty() {
1723 return Err(Error::InvalidArgumentError(
1724 "CREATE INDEX requires at least one column".into(),
1725 ));
1726 }
1727
1728 let (schema_name, base_table_name) = parse_schema_qualified_name(&table_name)?;
1729 if let Some(ref schema) = schema_name {
1730 let catalog = self.engine.context().table_catalog();
1731 if !catalog.schema_exists(schema) {
1732 return Err(Error::CatalogError(format!(
1733 "Schema '{}' does not exist",
1734 schema
1735 )));
1736 }
1737 }
1738
1739 let display_table_name = schema_name
1740 .as_ref()
1741 .map(|schema| format!("{}.{}", schema, base_table_name))
1742 .unwrap_or_else(|| base_table_name.clone());
1743 let canonical_table_name = display_table_name.to_ascii_lowercase();
1744
1745 let known_columns =
1746 self.collect_known_columns(&display_table_name, &canonical_table_name)?;
1747 let enforce_known_columns = !known_columns.is_empty();
1748
1749 let index_name = match name {
1750 Some(name_obj) => Some(Self::object_name_to_string(&name_obj)?),
1751 None => None,
1752 };
1753
1754 let mut index_columns: Vec<IndexColumnPlan> = Vec::with_capacity(columns.len());
1755 let mut seen_column_names: HashSet<String> = HashSet::new();
1756 for item in columns {
1757 if item.column.with_fill.is_some() {
1759 return Err(Error::InvalidArgumentError(
1760 "CREATE INDEX column WITH FILL is not supported".into(),
1761 ));
1762 }
1763
1764 let column_name = extract_index_column_name(
1765 &item,
1766 "CREATE INDEX",
1767 true, true, )?;
1770
1771 let order_expr = &item.column;
1773 let ascending = order_expr.options.asc.unwrap_or(true);
1774 let nulls_first = order_expr.options.nulls_first.unwrap_or(false);
1775
1776 let normalized = column_name.to_ascii_lowercase();
1777 if !seen_column_names.insert(normalized.clone()) {
1778 return Err(Error::InvalidArgumentError(format!(
1779 "duplicate column '{}' in CREATE INDEX",
1780 column_name
1781 )));
1782 }
1783
1784 if enforce_known_columns && !known_columns.contains(&normalized) {
1785 return Err(Error::InvalidArgumentError(format!(
1786 "column '{}' does not exist in table '{}'",
1787 column_name, display_table_name
1788 )));
1789 }
1790
1791 let column_plan = IndexColumnPlan::new(column_name).with_sort(ascending, nulls_first);
1792 index_columns.push(column_plan);
1793 }
1794
1795 let plan = CreateIndexPlan::new(display_table_name)
1796 .with_name(index_name)
1797 .with_unique(unique)
1798 .with_if_not_exists(if_not_exists)
1799 .with_columns(index_columns);
1800
1801 self.execute_plan_statement(PlanStatement::CreateIndex(plan))
1802 }
1803
1804 fn map_referential_action(
1805 action: Option<ReferentialAction>,
1806 kind: &str,
1807 ) -> SqlResult<ForeignKeyAction> {
1808 match action {
1809 None | Some(ReferentialAction::NoAction) => Ok(ForeignKeyAction::NoAction),
1810 Some(ReferentialAction::Restrict) => Ok(ForeignKeyAction::Restrict),
1811 Some(other) => Err(Error::InvalidArgumentError(format!(
1812 "FOREIGN KEY ON {kind} {:?} is not supported yet",
1813 other
1814 ))),
1815 }
1816 }
1817
1818 #[allow(clippy::too_many_arguments)]
1819 fn build_foreign_key_spec(
1820 &self,
1821 _referencing_display: &str,
1822 referencing_canonical: &str,
1823 referencing_columns: Vec<String>,
1824 foreign_table: &ObjectName,
1825 referenced_columns: &[Ident],
1826 on_delete: Option<ReferentialAction>,
1827 on_update: Option<ReferentialAction>,
1828 characteristics: &Option<ConstraintCharacteristics>,
1829 known_columns_lower: &HashSet<String>,
1830 name: Option<String>,
1831 ) -> SqlResult<ForeignKeySpec> {
1832 if characteristics.is_some() {
1833 return Err(Error::InvalidArgumentError(
1834 "FOREIGN KEY constraint characteristics are not supported yet".into(),
1835 ));
1836 }
1837
1838 ensure_non_empty(&referencing_columns, || {
1839 "FOREIGN KEY constraint requires at least one referencing column".into()
1840 })?;
1841 ensure_unique_case_insensitive(
1842 referencing_columns.iter().map(|name| name.as_str()),
1843 |dup| format!("duplicate column '{}' in FOREIGN KEY constraint", dup),
1844 )?;
1845 ensure_known_columns_case_insensitive(
1846 referencing_columns.iter().map(|name| name.as_str()),
1847 known_columns_lower,
1848 |unknown| format!("unknown column '{}' in FOREIGN KEY constraint", unknown),
1849 )?;
1850
1851 let referenced_columns_vec: Vec<String> = referenced_columns
1852 .iter()
1853 .map(|ident| ident.value.clone())
1854 .collect();
1855 ensure_unique_case_insensitive(
1856 referenced_columns_vec.iter().map(|name| name.as_str()),
1857 |dup| {
1858 format!(
1859 "duplicate referenced column '{}' in FOREIGN KEY constraint",
1860 dup
1861 )
1862 },
1863 )?;
1864
1865 if !referenced_columns_vec.is_empty()
1866 && referenced_columns_vec.len() != referencing_columns.len()
1867 {
1868 return Err(Error::InvalidArgumentError(
1869 "FOREIGN KEY referencing and referenced column counts must match".into(),
1870 ));
1871 }
1872
1873 let (referenced_display, referenced_canonical) = canonical_object_name(foreign_table)?;
1874
1875 let catalog = self.engine.context().table_catalog();
1877 if let Some(table_id) = catalog.table_id(&referenced_canonical) {
1878 let context = self.engine.context();
1879 if context.is_view(table_id)? {
1880 return Err(Error::CatalogError(format!(
1881 "Binder Error: cannot reference a VIEW with a FOREIGN KEY: {}",
1882 referenced_display
1883 )));
1884 }
1885 }
1886
1887 if referenced_canonical == referencing_canonical {
1888 ensure_known_columns_case_insensitive(
1889 referenced_columns_vec.iter().map(|name| name.as_str()),
1890 known_columns_lower,
1891 |unknown| {
1892 format!(
1893 "Binder Error: table '{}' does not have a column named '{}'",
1894 referenced_display, unknown
1895 )
1896 },
1897 )?;
1898 } else {
1899 let known_columns =
1900 self.collect_known_columns(&referenced_display, &referenced_canonical)?;
1901 if !known_columns.is_empty() {
1902 ensure_known_columns_case_insensitive(
1903 referenced_columns_vec.iter().map(|name| name.as_str()),
1904 &known_columns,
1905 |unknown| {
1906 format!(
1907 "Binder Error: table '{}' does not have a column named '{}'",
1908 referenced_display, unknown
1909 )
1910 },
1911 )?;
1912 }
1913 }
1914
1915 let on_delete_action = Self::map_referential_action(on_delete, "DELETE")?;
1916 let on_update_action = Self::map_referential_action(on_update, "UPDATE")?;
1917
1918 Ok(ForeignKeySpec {
1919 name,
1920 columns: referencing_columns,
1921 referenced_table: referenced_display,
1922 referenced_columns: referenced_columns_vec,
1923 on_delete: on_delete_action,
1924 on_update: on_update_action,
1925 })
1926 }
1927
1928 fn handle_create_schema(
1929 &self,
1930 schema_name: SchemaName,
1931 _if_not_exists: bool,
1932 with: Option<Vec<SqlOption>>,
1933 options: Option<Vec<SqlOption>>,
1934 default_collate_spec: Option<SqlExpr>,
1935 clone: Option<ObjectName>,
1936 ) -> SqlResult<RuntimeStatementResult<P>> {
1937 if clone.is_some() {
1938 return Err(Error::InvalidArgumentError(
1939 "CREATE SCHEMA ... CLONE is not supported".into(),
1940 ));
1941 }
1942 if with.as_ref().is_some_and(|opts| !opts.is_empty()) {
1943 return Err(Error::InvalidArgumentError(
1944 "CREATE SCHEMA ... WITH options are not supported".into(),
1945 ));
1946 }
1947 if options.as_ref().is_some_and(|opts| !opts.is_empty()) {
1948 return Err(Error::InvalidArgumentError(
1949 "CREATE SCHEMA options are not supported".into(),
1950 ));
1951 }
1952 if default_collate_spec.is_some() {
1953 return Err(Error::InvalidArgumentError(
1954 "CREATE SCHEMA DEFAULT COLLATE is not supported".into(),
1955 ));
1956 }
1957
1958 let schema_name = match schema_name {
1959 SchemaName::Simple(name) => name,
1960 _ => {
1961 return Err(Error::InvalidArgumentError(
1962 "CREATE SCHEMA authorization is not supported".into(),
1963 ));
1964 }
1965 };
1966
1967 let (display_name, canonical) = canonical_object_name(&schema_name)?;
1968 if display_name.is_empty() {
1969 return Err(Error::InvalidArgumentError(
1970 "schema name must not be empty".into(),
1971 ));
1972 }
1973
1974 let catalog = self.engine.context().table_catalog();
1976
1977 if _if_not_exists && catalog.schema_exists(&canonical) {
1978 return Ok(RuntimeStatementResult::NoOp);
1979 }
1980
1981 catalog.register_schema(&canonical).map_err(|err| {
1982 Error::CatalogError(format!(
1983 "Failed to create schema '{}': {}",
1984 display_name, err
1985 ))
1986 })?;
1987
1988 Ok(RuntimeStatementResult::NoOp)
1989 }
1990
1991 #[allow(clippy::too_many_arguments)]
1992 fn handle_create_view(
1993 &self,
1994 name: ObjectName,
1995 _columns: Vec<sqlparser::ast::ViewColumnDef>,
1996 query: Box<sqlparser::ast::Query>,
1997 materialized: bool,
1998 or_replace: bool,
1999 or_alter: bool,
2000 _options: sqlparser::ast::CreateTableOptions,
2001 _cluster_by: Vec<sqlparser::ast::Ident>,
2002 _comment: Option<String>,
2003 _with_no_schema_binding: bool,
2004 if_not_exists: bool,
2005 temporary: bool,
2006 _to: Option<ObjectName>,
2007 _params: Option<sqlparser::ast::CreateViewParams>,
2008 _secure: bool,
2009 _name_before_not_exists: bool,
2010 ) -> SqlResult<RuntimeStatementResult<P>> {
2011 if materialized {
2013 return Err(Error::InvalidArgumentError(
2014 "MATERIALIZED VIEWS are not supported".into(),
2015 ));
2016 }
2017 if or_replace {
2018 return Err(Error::InvalidArgumentError(
2019 "CREATE OR REPLACE VIEW is not supported".into(),
2020 ));
2021 }
2022 if or_alter {
2023 return Err(Error::InvalidArgumentError(
2024 "CREATE OR ALTER VIEW is not supported".into(),
2025 ));
2026 }
2027 if temporary {
2028 return Err(Error::InvalidArgumentError(
2029 "TEMPORARY VIEWS are not supported".into(),
2030 ));
2031 }
2032
2033 let (schema_name, view_name) = parse_schema_qualified_name(&name)?;
2035
2036 if let Some(ref schema) = schema_name {
2038 let catalog = self.engine.context().table_catalog();
2039 if !catalog.schema_exists(schema) {
2040 return Err(Error::CatalogError(format!(
2041 "Schema '{}' does not exist",
2042 schema
2043 )));
2044 }
2045 }
2046
2047 let display_name = match &schema_name {
2049 Some(schema) => format!("{}.{}", schema, view_name),
2050 None => view_name.clone(),
2051 };
2052 let canonical_name = display_name.to_ascii_lowercase();
2053
2054 let catalog = self.engine.context().table_catalog();
2056 if catalog.table_exists(&canonical_name) {
2057 if if_not_exists {
2058 return Ok(RuntimeStatementResult::NoOp);
2059 }
2060 return Err(Error::CatalogError(format!(
2061 "Table or view '{}' already exists",
2062 display_name
2063 )));
2064 }
2065
2066 let view_definition = query.to_string();
2068
2069 let context = self.engine.context();
2071 context.create_view(&display_name, view_definition)?;
2072
2073 tracing::debug!("Created view: {}", display_name);
2074 Ok(RuntimeStatementResult::NoOp)
2075 }
2076
2077 fn handle_create_domain(
2078 &self,
2079 create_domain: sqlparser::ast::CreateDomain,
2080 ) -> SqlResult<RuntimeStatementResult<P>> {
2081 use llkv_table::CustomTypeMeta;
2082 use std::time::{SystemTime, UNIX_EPOCH};
2083
2084 let type_name = create_domain.name.to_string();
2086
2087 let base_type_sql = create_domain.data_type.to_string();
2089
2090 self.engine
2092 .context()
2093 .register_type(type_name.clone(), create_domain.data_type.clone());
2094
2095 let context = self.engine.context();
2097 let catalog = llkv_table::SysCatalog::new(context.store());
2098
2099 let created_at_micros = SystemTime::now()
2100 .duration_since(UNIX_EPOCH)
2101 .unwrap_or_default()
2102 .as_micros() as u64;
2103
2104 let meta = CustomTypeMeta {
2105 name: type_name.clone(),
2106 base_type_sql,
2107 created_at_micros,
2108 };
2109
2110 catalog.put_custom_type_meta(&meta)?;
2111
2112 tracing::debug!("Created and persisted type alias: {}", type_name);
2113 Ok(RuntimeStatementResult::NoOp)
2114 }
2115
2116 fn handle_drop_domain(
2117 &self,
2118 drop_domain: sqlparser::ast::DropDomain,
2119 ) -> SqlResult<RuntimeStatementResult<P>> {
2120 let if_exists = drop_domain.if_exists;
2121 let type_name = drop_domain.name.to_string();
2122
2123 let result = self.engine.context().drop_type(&type_name);
2125
2126 if let Err(err) = result {
2127 if !if_exists {
2128 return Err(err);
2129 }
2130 } else {
2132 let context = self.engine.context();
2134 let catalog = llkv_table::SysCatalog::new(context.store());
2135 catalog.delete_custom_type_meta(&type_name)?;
2136
2137 tracing::debug!("Dropped and removed from catalog type alias: {}", type_name);
2138 }
2139
2140 Ok(RuntimeStatementResult::NoOp)
2141 }
2142
2143 fn try_handle_range_ctas(
2144 &self,
2145 display_name: &str,
2146 _canonical_name: &str,
2147 query: &Query,
2148 if_not_exists: bool,
2149 or_replace: bool,
2150 namespace: Option<String>,
2151 ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
2152 let select = match query.body.as_ref() {
2153 SetExpr::Select(select) => select,
2154 _ => return Ok(None),
2155 };
2156 if select.from.len() != 1 {
2157 return Ok(None);
2158 }
2159 let table_with_joins = &select.from[0];
2160 if !table_with_joins.joins.is_empty() {
2161 return Ok(None);
2162 }
2163 let (range_size, range_alias) = match &table_with_joins.relation {
2164 TableFactor::Table {
2165 name,
2166 args: Some(args),
2167 alias,
2168 ..
2169 } => {
2170 let func_name = name.to_string().to_ascii_lowercase();
2171 if func_name != "range" {
2172 return Ok(None);
2173 }
2174 if args.args.len() != 1 {
2175 return Err(Error::InvalidArgumentError(
2176 "range table function expects a single argument".into(),
2177 ));
2178 }
2179 let size_expr = &args.args[0];
2180 let range_size = match size_expr {
2181 FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
2182 match &value.value {
2183 Value::Number(raw, _) => raw.parse::<i64>().map_err(|e| {
2184 Error::InvalidArgumentError(format!(
2185 "invalid range size literal {}: {}",
2186 raw, e
2187 ))
2188 })?,
2189 other => {
2190 return Err(Error::InvalidArgumentError(format!(
2191 "unsupported range size value: {:?}",
2192 other
2193 )));
2194 }
2195 }
2196 }
2197 _ => {
2198 return Err(Error::InvalidArgumentError(
2199 "unsupported range argument".into(),
2200 ));
2201 }
2202 };
2203 (range_size, alias.as_ref().map(|a| a.name.value.clone()))
2204 }
2205 _ => return Ok(None),
2206 };
2207
2208 if range_size < 0 {
2209 return Err(Error::InvalidArgumentError(
2210 "range size must be non-negative".into(),
2211 ));
2212 }
2213
2214 if select.projection.is_empty() {
2215 return Err(Error::InvalidArgumentError(
2216 "CREATE TABLE AS SELECT requires at least one projected column".into(),
2217 ));
2218 }
2219
2220 let mut column_specs = Vec::with_capacity(select.projection.len());
2221 let mut column_names = Vec::with_capacity(select.projection.len());
2222 let mut row_template = Vec::with_capacity(select.projection.len());
2223 for item in &select.projection {
2224 match item {
2225 SelectItem::ExprWithAlias { expr, alias } => {
2226 let (value, data_type) = match expr {
2227 SqlExpr::Value(value_with_span) => match &value_with_span.value {
2228 Value::Number(raw, _) => {
2229 let parsed = raw.parse::<i64>().map_err(|e| {
2230 Error::InvalidArgumentError(format!(
2231 "invalid numeric literal {}: {}",
2232 raw, e
2233 ))
2234 })?;
2235 (
2236 PlanValue::Integer(parsed),
2237 arrow::datatypes::DataType::Int64,
2238 )
2239 }
2240 Value::SingleQuotedString(s) => (
2241 PlanValue::String(s.clone()),
2242 arrow::datatypes::DataType::Utf8,
2243 ),
2244 other => {
2245 return Err(Error::InvalidArgumentError(format!(
2246 "unsupported SELECT expression in range CTAS: {:?}",
2247 other
2248 )));
2249 }
2250 },
2251 SqlExpr::Identifier(ident) => {
2252 let ident_lower = ident.value.to_ascii_lowercase();
2253 if range_alias
2254 .as_ref()
2255 .map(|a| a.eq_ignore_ascii_case(&ident_lower))
2256 .unwrap_or(false)
2257 || ident_lower == "range"
2258 {
2259 return Err(Error::InvalidArgumentError(
2260 "range() table function columns are not supported yet".into(),
2261 ));
2262 }
2263 return Err(Error::InvalidArgumentError(format!(
2264 "unsupported identifier '{}' in range CTAS projection",
2265 ident.value
2266 )));
2267 }
2268 other => {
2269 return Err(Error::InvalidArgumentError(format!(
2270 "unsupported SELECT expression in range CTAS: {:?}",
2271 other
2272 )));
2273 }
2274 };
2275 let column_name = alias.value.clone();
2276 column_specs.push(PlanColumnSpec::new(column_name.clone(), data_type, true));
2277 column_names.push(column_name);
2278 row_template.push(value);
2279 }
2280 other => {
2281 return Err(Error::InvalidArgumentError(format!(
2282 "unsupported projection {:?} in range CTAS",
2283 other
2284 )));
2285 }
2286 }
2287 }
2288
2289 let plan = CreateTablePlan {
2290 name: display_name.to_string(),
2291 if_not_exists,
2292 or_replace,
2293 columns: column_specs,
2294 source: None,
2295 namespace,
2296 foreign_keys: Vec::new(),
2297 multi_column_uniques: Vec::new(),
2298 };
2299 let create_result = self.execute_plan_statement(PlanStatement::CreateTable(plan))?;
2300
2301 let row_count = range_size
2302 .try_into()
2303 .map_err(|_| Error::InvalidArgumentError("range size exceeds usize".into()))?;
2304 if row_count > 0 {
2305 let rows = vec![row_template; row_count];
2306 let insert_plan = InsertPlan {
2307 table: display_name.to_string(),
2308 columns: column_names,
2309 source: InsertSource::Rows(rows),
2310 };
2311 self.execute_plan_statement(PlanStatement::Insert(insert_plan))?;
2312 }
2313
2314 Ok(Some(create_result))
2315 }
2316
2317 fn try_handle_pragma_table_info(
2321 &self,
2322 query: &Query,
2323 ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
2324 let select = match query.body.as_ref() {
2325 SetExpr::Select(select) => select,
2326 _ => return Ok(None),
2327 };
2328
2329 if select.from.len() != 1 {
2330 return Ok(None);
2331 }
2332
2333 let table_with_joins = &select.from[0];
2334 if !table_with_joins.joins.is_empty() {
2335 return Ok(None);
2336 }
2337
2338 let table_name = match &table_with_joins.relation {
2340 TableFactor::Table {
2341 name,
2342 args: Some(args),
2343 ..
2344 } => {
2345 let func_name = name.to_string().to_ascii_lowercase();
2346 if func_name != "pragma_table_info" {
2347 return Ok(None);
2348 }
2349
2350 if args.args.len() != 1 {
2352 return Err(Error::InvalidArgumentError(
2353 "pragma_table_info expects exactly one argument".into(),
2354 ));
2355 }
2356
2357 match &args.args[0] {
2358 FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
2359 match &value.value {
2360 Value::SingleQuotedString(s) => s.clone(),
2361 Value::DoubleQuotedString(s) => s.clone(),
2362 _ => {
2363 return Err(Error::InvalidArgumentError(
2364 "pragma_table_info argument must be a string".into(),
2365 ));
2366 }
2367 }
2368 }
2369 _ => {
2370 return Err(Error::InvalidArgumentError(
2371 "pragma_table_info argument must be a string literal".into(),
2372 ));
2373 }
2374 }
2375 }
2376 _ => return Ok(None),
2377 };
2378
2379 let context = self.engine.context();
2381 let (_, canonical_name) = llkv_table::canonical_table_name(&table_name)?;
2382 let columns = context.catalog().table_column_specs(&canonical_name)?;
2383
2384 use arrow::array::{BooleanArray, Int32Array, StringArray};
2386 use arrow::datatypes::{DataType, Field, Schema};
2387
2388 let mut cid_values = Vec::new();
2389 let mut name_values = Vec::new();
2390 let mut type_values = Vec::new();
2391 let mut notnull_values = Vec::new();
2392 let mut dflt_value_values: Vec<Option<String>> = Vec::new();
2393 let mut pk_values = Vec::new();
2394
2395 for (idx, col) in columns.iter().enumerate() {
2396 cid_values.push(idx as i32);
2397 name_values.push(col.name.clone());
2398 type_values.push(format!("{:?}", col.data_type)); notnull_values.push(!col.nullable);
2400 dflt_value_values.push(None); pk_values.push(col.primary_key);
2402 }
2403
2404 let schema = Arc::new(Schema::new(vec![
2405 Field::new("cid", DataType::Int32, false),
2406 Field::new("name", DataType::Utf8, false),
2407 Field::new("type", DataType::Utf8, false),
2408 Field::new("notnull", DataType::Boolean, false),
2409 Field::new("dflt_value", DataType::Utf8, true),
2410 Field::new("pk", DataType::Boolean, false),
2411 ]));
2412
2413 use arrow::array::ArrayRef;
2414 let mut batch = RecordBatch::try_new(
2415 Arc::clone(&schema),
2416 vec![
2417 Arc::new(Int32Array::from(cid_values)) as ArrayRef,
2418 Arc::new(StringArray::from(name_values)) as ArrayRef,
2419 Arc::new(StringArray::from(type_values)) as ArrayRef,
2420 Arc::new(BooleanArray::from(notnull_values)) as ArrayRef,
2421 Arc::new(StringArray::from(dflt_value_values)) as ArrayRef,
2422 Arc::new(BooleanArray::from(pk_values)) as ArrayRef,
2423 ],
2424 )
2425 .map_err(|e| Error::Internal(format!("failed to create pragma_table_info batch: {}", e)))?;
2426
2427 let projection_indices: Vec<usize> = select
2429 .projection
2430 .iter()
2431 .filter_map(|item| {
2432 match item {
2433 SelectItem::UnnamedExpr(SqlExpr::Identifier(ident)) => {
2434 schema.index_of(&ident.value).ok()
2435 }
2436 SelectItem::ExprWithAlias { expr, .. } => {
2437 if let SqlExpr::Identifier(ident) = expr {
2438 schema.index_of(&ident.value).ok()
2439 } else {
2440 None
2441 }
2442 }
2443 SelectItem::Wildcard(_) => None, _ => None,
2445 }
2446 })
2447 .collect();
2448
2449 let projected_schema;
2451 if !projection_indices.is_empty() {
2452 let projected_fields: Vec<Field> = projection_indices
2453 .iter()
2454 .map(|&idx| schema.field(idx).clone())
2455 .collect();
2456 projected_schema = Arc::new(Schema::new(projected_fields));
2457
2458 let projected_columns: Vec<ArrayRef> = projection_indices
2459 .iter()
2460 .map(|&idx| Arc::clone(batch.column(idx)))
2461 .collect();
2462
2463 batch = RecordBatch::try_new(Arc::clone(&projected_schema), projected_columns)
2464 .map_err(|e| Error::Internal(format!("failed to project columns: {}", e)))?;
2465 } else {
2466 projected_schema = schema;
2468 }
2469
2470 if let Some(order_by) = &query.order_by {
2472 use arrow::compute::SortColumn;
2473 use arrow::compute::lexsort_to_indices;
2474 use sqlparser::ast::OrderByKind;
2475
2476 let exprs = match &order_by.kind {
2477 OrderByKind::Expressions(exprs) => exprs,
2478 _ => {
2479 return Err(Error::InvalidArgumentError(
2480 "unsupported ORDER BY clause".into(),
2481 ));
2482 }
2483 };
2484
2485 let mut sort_columns = Vec::new();
2486 for order_expr in exprs {
2487 if let SqlExpr::Identifier(ident) = &order_expr.expr
2488 && let Ok(col_idx) = projected_schema.index_of(&ident.value)
2489 {
2490 let options = arrow::compute::SortOptions {
2491 descending: !order_expr.options.asc.unwrap_or(true),
2492 nulls_first: order_expr.options.nulls_first.unwrap_or(false),
2493 };
2494 sort_columns.push(SortColumn {
2495 values: Arc::clone(batch.column(col_idx)),
2496 options: Some(options),
2497 });
2498 }
2499 }
2500
2501 if !sort_columns.is_empty() {
2502 let indices = lexsort_to_indices(&sort_columns, None)
2503 .map_err(|e| Error::Internal(format!("failed to sort: {}", e)))?;
2504
2505 use arrow::compute::take;
2506 let sorted_columns: Result<Vec<ArrayRef>, _> = batch
2507 .columns()
2508 .iter()
2509 .map(|col| take(col.as_ref(), &indices, None))
2510 .collect();
2511
2512 batch = RecordBatch::try_new(
2513 Arc::clone(&projected_schema),
2514 sorted_columns
2515 .map_err(|e| Error::Internal(format!("failed to apply sort: {}", e)))?,
2516 )
2517 .map_err(|e| Error::Internal(format!("failed to create sorted batch: {}", e)))?;
2518 }
2519 }
2520
2521 let execution = SelectExecution::new_single_batch(
2522 table_name.clone(),
2523 Arc::clone(&projected_schema),
2524 batch,
2525 );
2526
2527 Ok(Some(RuntimeStatementResult::Select {
2528 table_name,
2529 schema: projected_schema,
2530 execution: Box::new(execution),
2531 }))
2532 }
2533
2534 fn handle_create_table_as(
2535 &self,
2536 display_name: String,
2537 _canonical_name: String,
2538 query: Query,
2539 if_not_exists: bool,
2540 or_replace: bool,
2541 namespace: Option<String>,
2542 ) -> SqlResult<RuntimeStatementResult<P>> {
2543 if let SetExpr::Select(select) = query.body.as_ref()
2546 && let Some((rows, column_names)) = extract_values_from_derived_table(&select.from)?
2547 {
2548 return self.handle_create_table_from_values(
2550 display_name,
2551 rows,
2552 column_names,
2553 if_not_exists,
2554 or_replace,
2555 namespace,
2556 );
2557 }
2558
2559 let select_plan = self.build_select_plan(query)?;
2561
2562 if select_plan.projections.is_empty() && select_plan.aggregates.is_empty() {
2563 return Err(Error::InvalidArgumentError(
2564 "CREATE TABLE AS SELECT requires at least one projected column".into(),
2565 ));
2566 }
2567
2568 let plan = CreateTablePlan {
2569 name: display_name,
2570 if_not_exists,
2571 or_replace,
2572 columns: Vec::new(),
2573 source: Some(CreateTableSource::Select {
2574 plan: Box::new(select_plan),
2575 }),
2576 namespace,
2577 foreign_keys: Vec::new(),
2578 multi_column_uniques: Vec::new(),
2579 };
2580 self.execute_plan_statement(PlanStatement::CreateTable(plan))
2581 }
2582
2583 fn handle_create_table_from_values(
2584 &self,
2585 display_name: String,
2586 rows: Vec<Vec<PlanValue>>,
2587 column_names: Vec<String>,
2588 if_not_exists: bool,
2589 or_replace: bool,
2590 namespace: Option<String>,
2591 ) -> SqlResult<RuntimeStatementResult<P>> {
2592 use arrow::array::{ArrayRef, Float64Builder, Int64Builder, StringBuilder};
2593 use arrow::datatypes::{DataType, Field, Schema};
2594 use arrow::record_batch::RecordBatch;
2595 use std::sync::Arc;
2596
2597 if rows.is_empty() {
2598 return Err(Error::InvalidArgumentError(
2599 "VALUES must have at least one row".into(),
2600 ));
2601 }
2602
2603 let num_cols = column_names.len();
2604
2605 let first_row = &rows[0];
2607 if first_row.len() != num_cols {
2608 return Err(Error::InvalidArgumentError(
2609 "VALUES row column count mismatch".into(),
2610 ));
2611 }
2612
2613 let mut fields = Vec::with_capacity(num_cols);
2614 let mut column_types = Vec::with_capacity(num_cols);
2615
2616 for (idx, value) in first_row.iter().enumerate() {
2617 let (data_type, nullable) = match value {
2618 PlanValue::Integer(_) => (DataType::Int64, false),
2619 PlanValue::Float(_) => (DataType::Float64, false),
2620 PlanValue::String(_) => (DataType::Utf8, false),
2621 PlanValue::Null => (DataType::Utf8, true), _ => {
2623 return Err(Error::InvalidArgumentError(format!(
2624 "unsupported value type in VALUES for column '{}'",
2625 column_names.get(idx).unwrap_or(&format!("column{}", idx))
2626 )));
2627 }
2628 };
2629
2630 column_types.push(data_type.clone());
2631 fields.push(Field::new(&column_names[idx], data_type, nullable));
2632 }
2633
2634 let schema = Arc::new(Schema::new(fields));
2635
2636 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(num_cols);
2638
2639 for col_idx in 0..num_cols {
2640 let col_type = &column_types[col_idx];
2641
2642 match col_type {
2643 DataType::Int64 => {
2644 let mut builder = Int64Builder::with_capacity(rows.len());
2645 for row in &rows {
2646 match &row[col_idx] {
2647 PlanValue::Integer(v) => builder.append_value(*v),
2648 PlanValue::Null => builder.append_null(),
2649 other => {
2650 return Err(Error::InvalidArgumentError(format!(
2651 "type mismatch in VALUES: expected Integer, got {:?}",
2652 other
2653 )));
2654 }
2655 }
2656 }
2657 arrays.push(Arc::new(builder.finish()) as ArrayRef);
2658 }
2659 DataType::Float64 => {
2660 let mut builder = Float64Builder::with_capacity(rows.len());
2661 for row in &rows {
2662 match &row[col_idx] {
2663 PlanValue::Float(v) => builder.append_value(*v),
2664 PlanValue::Null => builder.append_null(),
2665 other => {
2666 return Err(Error::InvalidArgumentError(format!(
2667 "type mismatch in VALUES: expected Float, got {:?}",
2668 other
2669 )));
2670 }
2671 }
2672 }
2673 arrays.push(Arc::new(builder.finish()) as ArrayRef);
2674 }
2675 DataType::Utf8 => {
2676 let mut builder = StringBuilder::with_capacity(rows.len(), 1024);
2677 for row in &rows {
2678 match &row[col_idx] {
2679 PlanValue::String(v) => builder.append_value(v),
2680 PlanValue::Null => builder.append_null(),
2681 other => {
2682 return Err(Error::InvalidArgumentError(format!(
2683 "type mismatch in VALUES: expected String, got {:?}",
2684 other
2685 )));
2686 }
2687 }
2688 }
2689 arrays.push(Arc::new(builder.finish()) as ArrayRef);
2690 }
2691 other => {
2692 return Err(Error::InvalidArgumentError(format!(
2693 "unsupported column type in VALUES: {:?}",
2694 other
2695 )));
2696 }
2697 }
2698 }
2699
2700 let batch = RecordBatch::try_new(Arc::clone(&schema), arrays).map_err(|e| {
2701 Error::Internal(format!("failed to create RecordBatch from VALUES: {}", e))
2702 })?;
2703
2704 let plan = CreateTablePlan {
2705 name: display_name.clone(),
2706 if_not_exists,
2707 or_replace,
2708 columns: Vec::new(),
2709 source: Some(CreateTableSource::Batches {
2710 schema: Arc::clone(&schema),
2711 batches: vec![batch],
2712 }),
2713 namespace,
2714 foreign_keys: Vec::new(),
2715 multi_column_uniques: Vec::new(),
2716 };
2717
2718 self.execute_plan_statement(PlanStatement::CreateTable(plan))
2719 }
2720
2721 fn handle_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<RuntimeStatementResult<P>> {
2722 match self.prepare_insert(stmt)? {
2723 PreparedInsert::Values {
2724 table_name,
2725 columns,
2726 rows,
2727 } => {
2728 tracing::trace!(
2729 "DEBUG SQL handle_insert executing buffered-values insert for table={}",
2730 table_name
2731 );
2732 let plan = InsertPlan {
2733 table: table_name,
2734 columns,
2735 source: InsertSource::Rows(rows),
2736 };
2737 self.execute_plan_statement(PlanStatement::Insert(plan))
2738 }
2739 PreparedInsert::Immediate(plan) => {
2740 let table_name = plan.table.clone();
2741 tracing::trace!(
2742 "DEBUG SQL handle_insert executing immediate insert for table={}",
2743 table_name
2744 );
2745 self.execute_plan_statement(PlanStatement::Insert(plan))
2746 }
2747 }
2748 }
2749
2750 fn handle_update(
2751 &self,
2752 table: TableWithJoins,
2753 assignments: Vec<Assignment>,
2754 from: Option<UpdateTableFromKind>,
2755 selection: Option<SqlExpr>,
2756 returning: Option<Vec<SelectItem>>,
2757 ) -> SqlResult<RuntimeStatementResult<P>> {
2758 if from.is_some() {
2759 return Err(Error::InvalidArgumentError(
2760 "UPDATE ... FROM is not supported yet".into(),
2761 ));
2762 }
2763 if returning.is_some() {
2764 return Err(Error::InvalidArgumentError(
2765 "UPDATE ... RETURNING is not supported".into(),
2766 ));
2767 }
2768 if assignments.is_empty() {
2769 return Err(Error::InvalidArgumentError(
2770 "UPDATE requires at least one assignment".into(),
2771 ));
2772 }
2773
2774 let (display_name, canonical_name) = extract_single_table(std::slice::from_ref(&table))?;
2775
2776 if !self.engine.session().has_active_transaction()
2777 && self
2778 .engine
2779 .context()
2780 .is_table_marked_dropped(&canonical_name)
2781 {
2782 return Err(Error::TransactionContextError(
2783 DROPPED_TABLE_TRANSACTION_ERR.into(),
2784 ));
2785 }
2786
2787 let catalog = self.engine.context().table_catalog();
2788 let resolver = catalog.identifier_resolver();
2789 let table_id = catalog.table_id(&canonical_name);
2790
2791 let mut column_assignments = Vec::with_capacity(assignments.len());
2792 let mut seen: HashMap<String, ()> = HashMap::new();
2793 for assignment in assignments {
2794 let column_name = resolve_assignment_column_name(&assignment.target)?;
2795 let normalized = column_name.to_ascii_lowercase();
2796 if seen.insert(normalized, ()).is_some() {
2797 return Err(Error::InvalidArgumentError(format!(
2798 "duplicate column '{}' in UPDATE assignments",
2799 column_name
2800 )));
2801 }
2802 let value = match SqlValue::try_from_expr(&assignment.value) {
2803 Ok(literal) => AssignmentValue::Literal(PlanValue::from(literal)),
2804 Err(Error::InvalidArgumentError(msg))
2805 if msg.contains("unsupported literal expression") =>
2806 {
2807 let normalized_expr = self.materialize_in_subquery(assignment.value.clone())?;
2808 let translated = translate_scalar_with_context(
2809 &resolver,
2810 IdentifierContext::new(table_id),
2811 &normalized_expr,
2812 )?;
2813 AssignmentValue::Expression(translated)
2814 }
2815 Err(err) => return Err(err),
2816 };
2817 column_assignments.push(ColumnAssignment {
2818 column: column_name,
2819 value,
2820 });
2821 }
2822
2823 let filter = match selection {
2824 Some(expr) => {
2825 let materialized_expr = self.materialize_in_subquery(expr)?;
2826 let mut subqueries = Vec::new();
2827 let predicate = translate_condition_with_context(
2828 self,
2829 &resolver,
2830 IdentifierContext::new(table_id),
2831 &materialized_expr,
2832 &[],
2833 &mut subqueries,
2834 None,
2835 )?;
2836 if subqueries.is_empty() {
2837 Some(predicate)
2838 } else {
2839 return Err(Error::InvalidArgumentError(
2840 "EXISTS subqueries are not supported in UPDATE WHERE clauses".into(),
2841 ));
2842 }
2843 }
2844 None => None,
2845 };
2846
2847 let plan = UpdatePlan {
2848 table: display_name.clone(),
2849 assignments: column_assignments,
2850 filter,
2851 };
2852 self.execute_plan_statement(PlanStatement::Update(plan))
2853 }
2854
2855 #[allow(clippy::collapsible_if)]
2856 fn handle_delete(&self, delete: Delete) -> SqlResult<RuntimeStatementResult<P>> {
2857 let Delete {
2858 tables,
2859 from,
2860 using,
2861 selection,
2862 returning,
2863 order_by,
2864 limit,
2865 } = delete;
2866
2867 if !tables.is_empty() {
2868 return Err(Error::InvalidArgumentError(
2869 "multi-table DELETE is not supported yet".into(),
2870 ));
2871 }
2872 if let Some(using_tables) = using {
2873 if !using_tables.is_empty() {
2874 return Err(Error::InvalidArgumentError(
2875 "DELETE ... USING is not supported yet".into(),
2876 ));
2877 }
2878 }
2879 if returning.is_some() {
2880 return Err(Error::InvalidArgumentError(
2881 "DELETE ... RETURNING is not supported".into(),
2882 ));
2883 }
2884 if !order_by.is_empty() {
2885 return Err(Error::InvalidArgumentError(
2886 "DELETE ... ORDER BY is not supported yet".into(),
2887 ));
2888 }
2889 if limit.is_some() {
2890 return Err(Error::InvalidArgumentError(
2891 "DELETE ... LIMIT is not supported yet".into(),
2892 ));
2893 }
2894
2895 let from_tables = match from {
2896 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
2897 };
2898 let (display_name, canonical_name) = extract_single_table(&from_tables)?;
2899
2900 if !self.engine.session().has_active_transaction()
2901 && self
2902 .engine
2903 .context()
2904 .is_table_marked_dropped(&canonical_name)
2905 {
2906 return Err(Error::TransactionContextError(
2907 DROPPED_TABLE_TRANSACTION_ERR.into(),
2908 ));
2909 }
2910
2911 let catalog = self.engine.context().table_catalog();
2912 let resolver = catalog.identifier_resolver();
2913 let table_id = catalog.table_id(&canonical_name);
2914
2915 let filter = if let Some(expr) = selection {
2916 let materialized_expr = self.materialize_in_subquery(expr)?;
2917 let mut subqueries = Vec::new();
2918 let predicate = translate_condition_with_context(
2919 self,
2920 &resolver,
2921 IdentifierContext::new(table_id),
2922 &materialized_expr,
2923 &[],
2924 &mut subqueries,
2925 None,
2926 )?;
2927 if !subqueries.is_empty() {
2928 return Err(Error::InvalidArgumentError(
2929 "EXISTS subqueries are not supported in DELETE WHERE clauses".into(),
2930 ));
2931 }
2932 Some(predicate)
2933 } else {
2934 None
2935 };
2936
2937 let plan = DeletePlan {
2938 table: display_name.clone(),
2939 filter,
2940 };
2941 self.execute_plan_statement(PlanStatement::Delete(plan))
2942 }
2943
2944 fn handle_truncate(
2945 &self,
2946 table_names: &[sqlparser::ast::TruncateTableTarget],
2947 partitions: &Option<Vec<SqlExpr>>,
2948 _table: bool, identity: &Option<sqlparser::ast::TruncateIdentityOption>,
2950 cascade: Option<sqlparser::ast::CascadeOption>,
2951 on_cluster: &Option<Ident>,
2952 ) -> SqlResult<RuntimeStatementResult<P>> {
2953 if table_names.len() > 1 {
2955 return Err(Error::InvalidArgumentError(
2956 "TRUNCATE with multiple tables is not supported yet".into(),
2957 ));
2958 }
2959 if partitions.is_some() {
2960 return Err(Error::InvalidArgumentError(
2961 "TRUNCATE ... PARTITION is not supported".into(),
2962 ));
2963 }
2964 if identity.is_some() {
2965 return Err(Error::InvalidArgumentError(
2966 "TRUNCATE ... RESTART/CONTINUE IDENTITY is not supported".into(),
2967 ));
2968 }
2969 use sqlparser::ast::CascadeOption;
2970 if matches!(cascade, Some(CascadeOption::Cascade)) {
2971 return Err(Error::InvalidArgumentError(
2972 "TRUNCATE ... CASCADE is not supported".into(),
2973 ));
2974 }
2975 if on_cluster.is_some() {
2976 return Err(Error::InvalidArgumentError(
2977 "TRUNCATE ... ON CLUSTER is not supported".into(),
2978 ));
2979 }
2980
2981 let table_name = if let Some(target) = table_names.first() {
2983 let table_obj = &target.name;
2985 let display_name = table_obj.to_string();
2986 let canonical_name = display_name.to_ascii_lowercase();
2987
2988 if !self.engine.session().has_active_transaction()
2990 && self
2991 .engine
2992 .context()
2993 .is_table_marked_dropped(&canonical_name)
2994 {
2995 return Err(Error::TransactionContextError(
2996 DROPPED_TABLE_TRANSACTION_ERR.into(),
2997 ));
2998 }
2999
3000 display_name
3001 } else {
3002 return Err(Error::InvalidArgumentError(
3003 "TRUNCATE requires a table name".into(),
3004 ));
3005 };
3006
3007 let plan = TruncatePlan {
3008 table: table_name.clone(),
3009 };
3010 self.execute_plan_statement(PlanStatement::Truncate(plan))
3011 }
3012
3013 #[allow(clippy::too_many_arguments)] fn handle_drop(
3015 &self,
3016 object_type: ObjectType,
3017 if_exists: bool,
3018 names: Vec<ObjectName>,
3019 cascade: bool,
3020 restrict: bool,
3021 purge: bool,
3022 temporary: bool,
3023 ) -> SqlResult<RuntimeStatementResult<P>> {
3024 if purge || temporary {
3025 return Err(Error::InvalidArgumentError(
3026 "DROP purge/temporary options are not supported".into(),
3027 ));
3028 }
3029
3030 match object_type {
3031 ObjectType::Table => {
3032 if cascade || restrict {
3033 return Err(Error::InvalidArgumentError(
3034 "DROP TABLE CASCADE/RESTRICT is not supported".into(),
3035 ));
3036 }
3037
3038 for name in names {
3039 let table_name = Self::object_name_to_string(&name)?;
3040 let mut plan = llkv_plan::DropTablePlan::new(table_name.clone());
3041 plan.if_exists = if_exists;
3042
3043 self.execute_plan_statement(llkv_plan::PlanStatement::DropTable(plan))
3044 .map_err(|err| Self::map_table_error(&table_name, err))?;
3045 }
3046
3047 Ok(RuntimeStatementResult::NoOp)
3048 }
3049 ObjectType::Index => {
3050 if cascade || restrict {
3051 return Err(Error::InvalidArgumentError(
3052 "DROP INDEX CASCADE/RESTRICT is not supported".into(),
3053 ));
3054 }
3055
3056 for name in names {
3057 let index_name = Self::object_name_to_string(&name)?;
3058 let plan = llkv_plan::DropIndexPlan::new(index_name).if_exists(if_exists);
3059 self.execute_plan_statement(llkv_plan::PlanStatement::DropIndex(plan))?;
3060 }
3061
3062 Ok(RuntimeStatementResult::NoOp)
3063 }
3064 ObjectType::Schema => {
3065 if restrict {
3066 return Err(Error::InvalidArgumentError(
3067 "DROP SCHEMA RESTRICT is not supported".into(),
3068 ));
3069 }
3070
3071 let catalog = self.engine.context().table_catalog();
3072
3073 for name in names {
3074 let (display_name, canonical_name) = canonical_object_name(&name)?;
3075
3076 if !catalog.schema_exists(&canonical_name) {
3077 if if_exists {
3078 continue;
3079 }
3080 return Err(Error::CatalogError(format!(
3081 "Schema '{}' does not exist",
3082 display_name
3083 )));
3084 }
3085
3086 if cascade {
3087 let all_tables = catalog.table_names();
3089 let schema_prefix = format!("{}.", canonical_name);
3090
3091 for table in all_tables {
3092 if table.to_ascii_lowercase().starts_with(&schema_prefix) {
3093 let mut plan = llkv_plan::DropTablePlan::new(table.clone());
3094 plan.if_exists = false;
3095 self.execute_plan_statement(llkv_plan::PlanStatement::DropTable(
3096 plan,
3097 ))?;
3098 }
3099 }
3100 } else {
3101 let all_tables = catalog.table_names();
3103 let schema_prefix = format!("{}.", canonical_name);
3104 let has_tables = all_tables
3105 .iter()
3106 .any(|t| t.to_ascii_lowercase().starts_with(&schema_prefix));
3107
3108 if has_tables {
3109 return Err(Error::CatalogError(format!(
3110 "Schema '{}' is not empty. Use CASCADE to drop schema and all its tables",
3111 display_name
3112 )));
3113 }
3114 }
3115
3116 if !catalog.unregister_schema(&canonical_name) && !if_exists {
3118 return Err(Error::CatalogError(format!(
3119 "Schema '{}' does not exist",
3120 display_name
3121 )));
3122 }
3123 }
3124
3125 Ok(RuntimeStatementResult::NoOp)
3126 }
3127 _ => Err(Error::InvalidArgumentError(format!(
3128 "DROP {} is not supported",
3129 object_type
3130 ))),
3131 }
3132 }
3133
3134 fn handle_alter_table(
3135 &self,
3136 name: ObjectName,
3137 if_exists: bool,
3138 only: bool,
3139 operations: Vec<AlterTableOperation>,
3140 ) -> SqlResult<RuntimeStatementResult<P>> {
3141 if only {
3142 return Err(Error::InvalidArgumentError(
3143 "ALTER TABLE ONLY is not supported yet".into(),
3144 ));
3145 }
3146
3147 if operations.is_empty() {
3148 return Ok(RuntimeStatementResult::NoOp);
3149 }
3150
3151 if operations.len() != 1 {
3152 return Err(Error::InvalidArgumentError(
3153 "ALTER TABLE currently supports exactly one operation".into(),
3154 ));
3155 }
3156
3157 let operation = operations.into_iter().next().expect("checked length");
3158 match operation {
3159 AlterTableOperation::RenameTable { table_name } => {
3160 let new_name = table_name.to_string();
3161 self.handle_alter_table_rename(name, new_name, if_exists)
3162 }
3163 AlterTableOperation::RenameColumn {
3164 old_column_name,
3165 new_column_name,
3166 } => {
3167 let plan = llkv_plan::AlterTablePlan {
3168 table_name: name.to_string(),
3169 if_exists,
3170 operation: llkv_plan::AlterTableOperation::RenameColumn {
3171 old_column_name: old_column_name.to_string(),
3172 new_column_name: new_column_name.to_string(),
3173 },
3174 };
3175 self.execute_plan_statement(PlanStatement::AlterTable(plan))
3176 }
3177 AlterTableOperation::AlterColumn { column_name, op } => {
3178 if let AlterColumnOperation::SetDataType {
3180 data_type,
3181 using,
3182 had_set: _,
3183 } = op
3184 {
3185 if using.is_some() {
3186 return Err(Error::InvalidArgumentError(
3187 "ALTER COLUMN SET DATA TYPE USING clause is not yet supported".into(),
3188 ));
3189 }
3190
3191 let plan = llkv_plan::AlterTablePlan {
3192 table_name: name.to_string(),
3193 if_exists,
3194 operation: llkv_plan::AlterTableOperation::SetColumnDataType {
3195 column_name: column_name.to_string(),
3196 new_data_type: data_type.to_string(),
3197 },
3198 };
3199 self.execute_plan_statement(PlanStatement::AlterTable(plan))
3200 } else {
3201 Err(Error::InvalidArgumentError(format!(
3202 "unsupported ALTER COLUMN operation: {:?}",
3203 op
3204 )))
3205 }
3206 }
3207 AlterTableOperation::DropColumn {
3208 has_column_keyword: _,
3209 column_names,
3210 if_exists: column_if_exists,
3211 drop_behavior,
3212 } => {
3213 if column_names.len() != 1 {
3214 return Err(Error::InvalidArgumentError(
3215 "DROP COLUMN currently supports dropping one column at a time".into(),
3216 ));
3217 }
3218
3219 let column_name = column_names.into_iter().next().unwrap().to_string();
3220 let cascade = matches!(drop_behavior, Some(sqlparser::ast::DropBehavior::Cascade));
3221
3222 let plan = llkv_plan::AlterTablePlan {
3223 table_name: name.to_string(),
3224 if_exists,
3225 operation: llkv_plan::AlterTableOperation::DropColumn {
3226 column_name,
3227 if_exists: column_if_exists,
3228 cascade,
3229 },
3230 };
3231 self.execute_plan_statement(PlanStatement::AlterTable(plan))
3232 }
3233 other => Err(Error::InvalidArgumentError(format!(
3234 "unsupported ALTER TABLE operation: {:?}",
3235 other
3236 ))),
3237 }
3238 }
3239
3240 fn handle_alter_table_rename(
3241 &self,
3242 original_name: ObjectName,
3243 new_table_name: String,
3244 if_exists: bool,
3245 ) -> SqlResult<RuntimeStatementResult<P>> {
3246 let (schema_opt, table_name) = parse_schema_qualified_name(&original_name)?;
3247
3248 let new_table_name_clean = new_table_name.trim();
3249
3250 if new_table_name_clean.is_empty() {
3251 return Err(Error::InvalidArgumentError(
3252 "ALTER TABLE RENAME requires a non-empty table name".into(),
3253 ));
3254 }
3255
3256 let (raw_new_schema_opt, raw_new_table) =
3257 if let Some((schema_part, table_part)) = new_table_name_clean.split_once('.') {
3258 (
3259 Some(schema_part.trim().to_string()),
3260 table_part.trim().to_string(),
3261 )
3262 } else {
3263 (None, new_table_name_clean.to_string())
3264 };
3265
3266 if schema_opt.is_none() && raw_new_schema_opt.is_some() {
3267 return Err(Error::InvalidArgumentError(
3268 "ALTER TABLE RENAME cannot add a schema qualifier".into(),
3269 ));
3270 }
3271
3272 let new_table_trimmed = raw_new_table.trim_matches('"');
3273 if new_table_trimmed.is_empty() {
3274 return Err(Error::InvalidArgumentError(
3275 "ALTER TABLE RENAME requires a non-empty table name".into(),
3276 ));
3277 }
3278
3279 if let (Some(existing_schema), Some(new_schema_raw)) =
3280 (schema_opt.as_ref(), raw_new_schema_opt.as_ref())
3281 {
3282 let new_schema_trimmed = new_schema_raw.trim_matches('"');
3283 if !existing_schema.eq_ignore_ascii_case(new_schema_trimmed) {
3284 return Err(Error::InvalidArgumentError(
3285 "ALTER TABLE RENAME cannot change table schema".into(),
3286 ));
3287 }
3288 }
3289
3290 let new_table_display = raw_new_table;
3291 let new_schema_opt = raw_new_schema_opt;
3292
3293 fn join_schema_table(schema: &str, table: &str) -> String {
3294 let mut qualified = String::with_capacity(schema.len() + table.len() + 1);
3295 qualified.push_str(schema);
3296 qualified.push('.');
3297 qualified.push_str(table);
3298 qualified
3299 }
3300
3301 let current_display = schema_opt
3302 .as_ref()
3303 .map(|schema| join_schema_table(schema, &table_name))
3304 .unwrap_or_else(|| table_name.clone());
3305
3306 let new_display = if let Some(new_schema_raw) = new_schema_opt.clone() {
3307 join_schema_table(&new_schema_raw, &new_table_display)
3308 } else if let Some(schema) = schema_opt.as_ref() {
3309 join_schema_table(schema, &new_table_display)
3310 } else {
3311 new_table_display.clone()
3312 };
3313
3314 let plan = RenameTablePlan::new(¤t_display, &new_display).if_exists(if_exists);
3315
3316 match CatalogDdl::rename_table(self.engine.session(), plan) {
3317 Ok(()) => Ok(RuntimeStatementResult::NoOp),
3318 Err(err) => Err(Self::map_table_error(¤t_display, err)),
3319 }
3320 }
3321
3322 fn try_materialize_avg_subquery(&self, query: &Query) -> SqlResult<Option<SqlExpr>> {
3330 use sqlparser::ast::{
3331 DuplicateTreatment, FunctionArg, FunctionArgExpr, FunctionArguments, ObjectName,
3332 ObjectNamePart, SelectItem, SetExpr,
3333 };
3334
3335 let select = match query.body.as_ref() {
3336 SetExpr::Select(select) => select.as_ref(),
3337 _ => return Ok(None),
3338 };
3339
3340 if select.projection.len() != 1
3341 || select.distinct.is_some()
3342 || select.top.is_some()
3343 || select.value_table_mode.is_some()
3344 || select.having.is_some()
3345 || !group_by_is_empty(&select.group_by)
3346 || select.into.is_some()
3347 || !select.lateral_views.is_empty()
3348 {
3349 return Ok(None);
3350 }
3351
3352 let func = match &select.projection[0] {
3353 SelectItem::UnnamedExpr(SqlExpr::Function(func)) => func,
3354 _ => return Ok(None),
3355 };
3356
3357 if func.uses_odbc_syntax
3358 || func.filter.is_some()
3359 || func.null_treatment.is_some()
3360 || func.over.is_some()
3361 || !func.within_group.is_empty()
3362 {
3363 return Ok(None);
3364 }
3365
3366 let func_name = func.name.to_string().to_ascii_lowercase();
3367 if func_name != "avg" {
3368 return Ok(None);
3369 }
3370
3371 let args = match &func.args {
3372 FunctionArguments::List(list) => {
3373 if matches!(list.duplicate_treatment, Some(DuplicateTreatment::Distinct))
3374 || !list.clauses.is_empty()
3375 {
3376 return Ok(None);
3377 }
3378 &list.args
3379 }
3380 _ => return Ok(None),
3381 };
3382
3383 if args.len() != 1 {
3384 return Ok(None);
3385 }
3386
3387 match &args[0] {
3388 FunctionArg::Unnamed(FunctionArgExpr::Expr(_)) => {}
3389 _ => return Ok(None),
3390 };
3391
3392 let mut sum_query = query.clone();
3393 let mut count_query = query.clone();
3394
3395 let build_replacement = |target_query: &mut Query, name: &str| -> SqlResult<()> {
3396 let select = match target_query.body.as_mut() {
3397 SetExpr::Select(select) => select,
3398 _ => {
3399 return Err(Error::Internal(
3400 "expected SELECT query in AVG materialization".into(),
3401 ));
3402 }
3403 };
3404
3405 let mut replacement_func = func.clone();
3406 replacement_func.name = ObjectName(vec![ObjectNamePart::Identifier(Ident {
3407 value: name.to_string(),
3408 quote_style: None,
3409 span: Span::empty(),
3410 })]);
3411 select.projection = vec![SelectItem::UnnamedExpr(SqlExpr::Function(replacement_func))];
3412 Ok(())
3413 };
3414
3415 build_replacement(&mut sum_query, "sum")?;
3416 build_replacement(&mut count_query, "count")?;
3417
3418 let sum_value = self.execute_scalar_int64(sum_query)?;
3419 let count_value = self.execute_scalar_int64(count_query)?;
3420
3421 let Some(count_value) = count_value else {
3422 return Ok(Some(SqlExpr::Value(ValueWithSpan {
3423 value: Value::Null,
3424 span: Span::empty(),
3425 })));
3426 };
3427
3428 if count_value == 0 {
3429 return Ok(Some(SqlExpr::Value(ValueWithSpan {
3430 value: Value::Null,
3431 span: Span::empty(),
3432 })));
3433 }
3434
3435 let sum_value = match sum_value {
3436 Some(value) => value,
3437 None => {
3438 return Ok(Some(SqlExpr::Value(ValueWithSpan {
3439 value: Value::Null,
3440 span: Span::empty(),
3441 })));
3442 }
3443 };
3444
3445 let avg = (sum_value as f64) / (count_value as f64);
3446 let value = ValueWithSpan {
3447 value: Value::Number(avg.to_string(), false),
3448 span: Span::empty(),
3449 };
3450 Ok(Some(SqlExpr::Value(value)))
3451 }
3452
3453 fn execute_scalar_int64(&self, query: Query) -> SqlResult<Option<i64>> {
3454 let result = self.handle_query(query)?;
3455 let execution = match result {
3456 RuntimeStatementResult::Select { execution, .. } => execution,
3457 _ => {
3458 return Err(Error::InvalidArgumentError(
3459 "scalar aggregate must be a SELECT statement".into(),
3460 ));
3461 }
3462 };
3463
3464 let batches = execution.collect()?;
3465 let mut captured: Option<Option<i64>> = None;
3466
3467 for batch in batches {
3468 if batch.num_columns() == 0 {
3469 continue;
3470 }
3471 if batch.num_columns() != 1 {
3472 return Err(Error::InvalidArgumentError(
3473 "scalar aggregate must return exactly one column".into(),
3474 ));
3475 }
3476
3477 let array = batch.column(0);
3478 let values = array
3479 .as_any()
3480 .downcast_ref::<arrow::array::Int64Array>()
3481 .ok_or_else(|| {
3482 Error::InvalidArgumentError(
3483 "scalar aggregate result must be an INT64 value".into(),
3484 )
3485 })?;
3486
3487 for idx in 0..values.len() {
3488 if captured.is_some() {
3489 return Err(Error::InvalidArgumentError(
3490 "scalar aggregate returned more than one row".into(),
3491 ));
3492 }
3493 if values.is_null(idx) {
3494 captured = Some(None);
3495 } else {
3496 captured = Some(Some(values.value(idx)));
3497 }
3498 }
3499 }
3500
3501 Ok(captured.unwrap_or(None))
3502 }
3503
3504 fn materialize_scalar_subquery(&self, subquery: Query) -> SqlResult<SqlExpr> {
3505 if let Some(avg_literal) = self.try_materialize_avg_subquery(&subquery)? {
3506 return Ok(avg_literal);
3507 }
3508
3509 let result = self.handle_query(subquery)?;
3510 let execution = match result {
3511 RuntimeStatementResult::Select { execution, .. } => execution,
3512 _ => {
3513 return Err(Error::InvalidArgumentError(
3514 "scalar subquery must be a SELECT statement".into(),
3515 ));
3516 }
3517 };
3518
3519 let batches = execution.collect()?;
3520 let mut captured_value: Option<ValueWithSpan> = None;
3521
3522 for batch in batches {
3523 if batch.num_columns() == 0 {
3524 continue;
3525 }
3526 if batch.num_columns() != 1 {
3527 return Err(Error::InvalidArgumentError(
3528 "scalar subquery must return exactly one column".into(),
3529 ));
3530 }
3531
3532 let column = batch.column(0);
3533 for row_idx in 0..batch.num_rows() {
3534 if captured_value.is_some() {
3535 return Err(Error::InvalidArgumentError(
3536 "scalar subquery returned more than one row".into(),
3537 ));
3538 }
3539
3540 let value = if column.is_null(row_idx) {
3541 Value::Null
3542 } else {
3543 use arrow::array::{BooleanArray, Float64Array, Int64Array, StringArray};
3544 match column.data_type() {
3545 arrow::datatypes::DataType::Int64 => {
3546 let array =
3547 column
3548 .as_any()
3549 .downcast_ref::<Int64Array>()
3550 .ok_or_else(|| {
3551 Error::Internal(
3552 "expected Int64 array for scalar subquery".into(),
3553 )
3554 })?;
3555 Value::Number(array.value(row_idx).to_string(), false)
3556 }
3557 arrow::datatypes::DataType::Float64 => {
3558 let array = column.as_any().downcast_ref::<Float64Array>().ok_or_else(
3559 || {
3560 Error::Internal(
3561 "expected Float64 array for scalar subquery".into(),
3562 )
3563 },
3564 )?;
3565 Value::Number(array.value(row_idx).to_string(), false)
3566 }
3567 arrow::datatypes::DataType::Utf8 => {
3568 let array =
3569 column
3570 .as_any()
3571 .downcast_ref::<StringArray>()
3572 .ok_or_else(|| {
3573 Error::Internal(
3574 "expected String array for scalar subquery".into(),
3575 )
3576 })?;
3577 Value::SingleQuotedString(array.value(row_idx).to_string())
3578 }
3579 arrow::datatypes::DataType::Boolean => {
3580 let array = column.as_any().downcast_ref::<BooleanArray>().ok_or_else(
3581 || {
3582 Error::Internal(
3583 "expected Boolean array for scalar subquery".into(),
3584 )
3585 },
3586 )?;
3587 Value::Boolean(array.value(row_idx))
3588 }
3589 other => {
3590 return Err(Error::InvalidArgumentError(format!(
3591 "unsupported data type in scalar subquery result: {other:?}"
3592 )));
3593 }
3594 }
3595 };
3596
3597 captured_value = Some(ValueWithSpan {
3598 value,
3599 span: Span::empty(),
3600 });
3601 }
3602 }
3603
3604 let final_value = captured_value.unwrap_or(ValueWithSpan {
3605 value: Value::Null,
3606 span: Span::empty(),
3607 });
3608 Ok(SqlExpr::Value(final_value))
3609 }
3610
3611 fn materialize_in_subquery(&self, root_expr: SqlExpr) -> SqlResult<SqlExpr> {
3612 enum WorkItem {
3614 Process(Box<SqlExpr>),
3615 BuildBinaryOp {
3616 op: BinaryOperator,
3617 left: Box<SqlExpr>,
3618 right_done: bool,
3619 },
3620 BuildUnaryOp {
3621 op: UnaryOperator,
3622 },
3623 BuildNested,
3624 BuildIsNull,
3625 BuildIsNotNull,
3626 FinishBetween {
3627 negated: bool,
3628 },
3629 }
3630
3631 let mut work_stack: Vec<WorkItem> = vec![WorkItem::Process(Box::new(root_expr))];
3632 let mut result_stack: Vec<SqlExpr> = Vec::new();
3633
3634 while let Some(item) = work_stack.pop() {
3635 match item {
3636 WorkItem::Process(expr) => {
3637 match *expr {
3638 SqlExpr::InSubquery {
3639 expr: left_expr,
3640 subquery,
3641 negated,
3642 } => {
3643 let result = self.handle_query(*subquery)?;
3645
3646 let values = match result {
3648 RuntimeStatementResult::Select { execution, .. } => {
3649 let batches = execution.collect()?;
3650 let mut collected_values = Vec::new();
3651
3652 for batch in batches {
3653 if batch.num_columns() == 0 {
3654 continue;
3655 }
3656 let column = batch.column(0);
3657
3658 for row_idx in 0..column.len() {
3659 use arrow::datatypes::DataType;
3660 let value = if column.is_null(row_idx) {
3661 Value::Null
3662 } else {
3663 match column.data_type() {
3664 DataType::Int64 => {
3665 let arr = column
3666 .as_any()
3667 .downcast_ref::<arrow::array::Int64Array>()
3668 .unwrap();
3669 Value::Number(
3670 arr.value(row_idx).to_string(),
3671 false,
3672 )
3673 }
3674 DataType::Float64 => {
3675 let arr = column
3676 .as_any()
3677 .downcast_ref::<arrow::array::Float64Array>()
3678 .unwrap();
3679 Value::Number(
3680 arr.value(row_idx).to_string(),
3681 false,
3682 )
3683 }
3684 DataType::Utf8 => {
3685 let arr = column
3686 .as_any()
3687 .downcast_ref::<arrow::array::StringArray>()
3688 .unwrap();
3689 Value::SingleQuotedString(
3690 arr.value(row_idx).to_string(),
3691 )
3692 }
3693 DataType::Boolean => {
3694 let arr = column
3695 .as_any()
3696 .downcast_ref::<arrow::array::BooleanArray>()
3697 .unwrap();
3698 Value::Boolean(arr.value(row_idx))
3699 }
3700 other => {
3701 return Err(Error::InvalidArgumentError(
3702 format!(
3703 "unsupported data type in IN subquery: {other:?}"
3704 ),
3705 ));
3706 }
3707 }
3708 };
3709 collected_values.push(ValueWithSpan {
3710 value,
3711 span: Span::empty(),
3712 });
3713 }
3714 }
3715
3716 collected_values
3717 }
3718 _ => {
3719 return Err(Error::InvalidArgumentError(
3720 "IN subquery must be a SELECT statement".into(),
3721 ));
3722 }
3723 };
3724
3725 result_stack.push(SqlExpr::InList {
3727 expr: left_expr,
3728 list: values.into_iter().map(SqlExpr::Value).collect(),
3729 negated,
3730 });
3731 }
3732 SqlExpr::Subquery(subquery) => {
3733 let scalar_expr = self.materialize_scalar_subquery(*subquery)?;
3734 result_stack.push(scalar_expr);
3735 }
3736 SqlExpr::Case {
3737 case_token,
3738 end_token,
3739 operand,
3740 conditions,
3741 else_result,
3742 } => {
3743 let new_operand = match operand {
3744 Some(expr) => Some(Box::new(self.materialize_in_subquery(*expr)?)),
3745 None => None,
3746 };
3747 let mut new_conditions = Vec::with_capacity(conditions.len());
3748 for branch in conditions {
3749 let condition = self.materialize_in_subquery(branch.condition)?;
3750 let result = self.materialize_in_subquery(branch.result)?;
3751 new_conditions.push(sqlparser::ast::CaseWhen { condition, result });
3752 }
3753 let new_else = match else_result {
3754 Some(expr) => Some(Box::new(self.materialize_in_subquery(*expr)?)),
3755 None => None,
3756 };
3757 result_stack.push(SqlExpr::Case {
3758 case_token,
3759 end_token,
3760 operand: new_operand,
3761 conditions: new_conditions,
3762 else_result: new_else,
3763 });
3764 }
3765 SqlExpr::BinaryOp { left, op, right } => {
3766 work_stack.push(WorkItem::BuildBinaryOp {
3771 op,
3772 left: left.clone(),
3773 right_done: false,
3774 });
3775 work_stack.push(WorkItem::Process(left));
3779 work_stack.push(WorkItem::Process(right));
3780 }
3781 SqlExpr::UnaryOp { op, expr } => {
3782 work_stack.push(WorkItem::BuildUnaryOp { op });
3783 work_stack.push(WorkItem::Process(expr));
3784 }
3785 SqlExpr::Nested(inner) => {
3786 work_stack.push(WorkItem::BuildNested);
3787 work_stack.push(WorkItem::Process(inner));
3788 }
3789 SqlExpr::IsNull(inner) => {
3790 work_stack.push(WorkItem::BuildIsNull);
3791 work_stack.push(WorkItem::Process(inner));
3792 }
3793 SqlExpr::IsNotNull(inner) => {
3794 work_stack.push(WorkItem::BuildIsNotNull);
3795 work_stack.push(WorkItem::Process(inner));
3796 }
3797 SqlExpr::Between {
3798 expr,
3799 negated,
3800 low,
3801 high,
3802 } => {
3803 work_stack.push(WorkItem::FinishBetween { negated });
3804 work_stack.push(WorkItem::Process(high));
3805 work_stack.push(WorkItem::Process(low));
3806 work_stack.push(WorkItem::Process(expr));
3807 }
3808 other => {
3810 result_stack.push(other);
3811 }
3812 }
3813 }
3814 WorkItem::BuildBinaryOp {
3815 op,
3816 left,
3817 right_done,
3818 } => {
3819 if !right_done {
3820 let left_result = result_stack.pop().unwrap();
3822 work_stack.push(WorkItem::BuildBinaryOp {
3823 op,
3824 left: Box::new(left_result),
3825 right_done: true,
3826 });
3827 } else {
3828 let right_result = result_stack.pop().unwrap();
3830 let left_result = *left;
3831 result_stack.push(SqlExpr::BinaryOp {
3832 left: Box::new(left_result),
3833 op,
3834 right: Box::new(right_result),
3835 });
3836 }
3837 }
3838 WorkItem::BuildUnaryOp { op } => {
3839 let inner = result_stack.pop().unwrap();
3840 result_stack.push(SqlExpr::UnaryOp {
3841 op,
3842 expr: Box::new(inner),
3843 });
3844 }
3845 WorkItem::BuildNested => {
3846 let inner = result_stack.pop().unwrap();
3847 result_stack.push(SqlExpr::Nested(Box::new(inner)));
3848 }
3849 WorkItem::BuildIsNull => {
3850 let inner = result_stack.pop().unwrap();
3851 result_stack.push(SqlExpr::IsNull(Box::new(inner)));
3852 }
3853 WorkItem::BuildIsNotNull => {
3854 let inner = result_stack.pop().unwrap();
3855 result_stack.push(SqlExpr::IsNotNull(Box::new(inner)));
3856 }
3857 WorkItem::FinishBetween { negated } => {
3858 let high_result = result_stack.pop().unwrap();
3859 let low_result = result_stack.pop().unwrap();
3860 let expr_result = result_stack.pop().unwrap();
3861 result_stack.push(SqlExpr::Between {
3862 expr: Box::new(expr_result),
3863 negated,
3864 low: Box::new(low_result),
3865 high: Box::new(high_result),
3866 });
3867 }
3868 }
3869 }
3870
3871 Ok(result_stack
3873 .pop()
3874 .expect("result stack should have exactly one item"))
3875 }
3876
3877 fn handle_query(&self, query: Query) -> SqlResult<RuntimeStatementResult<P>> {
3878 if let Some(result) = self.try_handle_pragma_table_info(&query)? {
3880 return Ok(result);
3881 }
3882
3883 let select_plan = self.build_select_plan(query)?;
3884 self.execute_plan_statement(PlanStatement::Select(Box::new(select_plan)))
3885 }
3886
3887 fn build_select_plan(&self, query: Query) -> SqlResult<SelectPlan> {
3888 if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
3889 return Err(Error::TransactionContextError(
3890 "TransactionContext Error: transaction is aborted".into(),
3891 ));
3892 }
3893
3894 validate_simple_query(&query)?;
3895 let catalog = self.engine.context().table_catalog();
3896 let resolver = catalog.identifier_resolver();
3897
3898 let (mut select_plan, select_context) =
3899 self.translate_query_body(query.body.as_ref(), &resolver)?;
3900 if let Some(order_by) = &query.order_by {
3901 if !select_plan.aggregates.is_empty() {
3902 return Err(Error::InvalidArgumentError(
3903 "ORDER BY is not supported for aggregate queries".into(),
3904 ));
3905 }
3906 let order_plan = self.translate_order_by(&resolver, select_context, order_by)?;
3907 select_plan = select_plan.with_order_by(order_plan);
3908 }
3909 Ok(select_plan)
3910 }
3911
3912 fn build_select_plan_internal(
3921 &self,
3922 query: Query,
3923 resolver: &IdentifierResolver<'_>,
3924 outer_scopes: &[IdentifierContext],
3925 subqueries: &mut Vec<llkv_plan::FilterSubquery>,
3926 correlated_tracker: Option<&mut SubqueryCorrelatedColumnTracker>,
3927 ) -> SqlResult<SelectPlan> {
3928 if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
3929 return Err(Error::TransactionContextError(
3930 "TransactionContext Error: transaction is aborted".into(),
3931 ));
3932 }
3933
3934 validate_simple_query(&query)?;
3935
3936 let (mut select_plan, select_context) = self.translate_query_body_internal(
3937 query.body.as_ref(),
3938 resolver,
3939 outer_scopes,
3940 subqueries,
3941 correlated_tracker,
3942 )?;
3943 if let Some(order_by) = &query.order_by {
3944 if !select_plan.aggregates.is_empty() {
3945 return Err(Error::InvalidArgumentError(
3946 "ORDER BY is not supported for aggregate queries".into(),
3947 ));
3948 }
3949 let order_plan = self.translate_order_by(resolver, select_context, order_by)?;
3950 select_plan = select_plan.with_order_by(order_plan);
3951 }
3952 Ok(select_plan)
3953 }
3954
3955 fn translate_select(
3956 &self,
3957 select: &Select,
3958 resolver: &IdentifierResolver<'_>,
3959 ) -> SqlResult<(SelectPlan, IdentifierContext)> {
3960 let mut subqueries = Vec::new();
3961 let result =
3962 self.translate_select_internal(select, resolver, &[], &mut subqueries, None)?;
3963 if !subqueries.is_empty() {
3964 return Err(Error::Internal(
3965 "translate_select: unexpected subqueries from non-correlated translation".into(),
3966 ));
3967 }
3968 Ok(result)
3969 }
3970
3971 fn translate_query_body(
3972 &self,
3973 body: &SetExpr,
3974 resolver: &IdentifierResolver<'_>,
3975 ) -> SqlResult<(SelectPlan, IdentifierContext)> {
3976 let mut subqueries = Vec::new();
3977 let result =
3978 self.translate_query_body_internal(body, resolver, &[], &mut subqueries, None)?;
3979 if !subqueries.is_empty() {
3980 return Err(Error::Internal(
3981 "translate_query_body: unexpected subqueries from non-correlated translation"
3982 .into(),
3983 ));
3984 }
3985 Ok(result)
3986 }
3987
3988 fn translate_query_body_internal(
3989 &self,
3990 body: &SetExpr,
3991 resolver: &IdentifierResolver<'_>,
3992 outer_scopes: &[IdentifierContext],
3993 subqueries: &mut Vec<llkv_plan::FilterSubquery>,
3994 mut correlated_tracker: Option<&mut SubqueryCorrelatedColumnTracker>,
3995 ) -> SqlResult<(SelectPlan, IdentifierContext)> {
3996 match body {
3997 SetExpr::Select(select) => self.translate_select_internal(
3998 select.as_ref(),
3999 resolver,
4000 outer_scopes,
4001 subqueries,
4002 correlated_tracker,
4003 ),
4004 SetExpr::Query(query) => self.translate_query_body_internal(
4005 &query.body,
4006 resolver,
4007 outer_scopes,
4008 subqueries,
4009 correlated_tracker,
4010 ),
4011 SetExpr::SetOperation {
4012 left,
4013 right,
4014 op,
4015 set_quantifier,
4016 } => {
4017 let left_tracker = correlated_tracker.reborrow();
4018 let (left_plan, left_context) = self.translate_query_body_internal(
4019 left.as_ref(),
4020 resolver,
4021 outer_scopes,
4022 subqueries,
4023 left_tracker,
4024 )?;
4025
4026 let right_tracker = correlated_tracker.reborrow();
4027 let (right_plan, _) = self.translate_query_body_internal(
4028 right.as_ref(),
4029 resolver,
4030 outer_scopes,
4031 subqueries,
4032 right_tracker,
4033 )?;
4034
4035 let operator = match op {
4036 sqlparser::ast::SetOperator::Union => llkv_plan::CompoundOperator::Union,
4037 sqlparser::ast::SetOperator::Intersect => {
4038 llkv_plan::CompoundOperator::Intersect
4039 }
4040 sqlparser::ast::SetOperator::Except | sqlparser::ast::SetOperator::Minus => {
4041 llkv_plan::CompoundOperator::Except
4042 }
4043 };
4044
4045 let quantifier = match set_quantifier {
4046 SetQuantifier::All => llkv_plan::CompoundQuantifier::All,
4047 _ => llkv_plan::CompoundQuantifier::Distinct,
4048 };
4049
4050 let mut compound = if let Some(existing) = left_plan.compound {
4051 existing
4052 } else {
4053 llkv_plan::CompoundSelectPlan::new(left_plan)
4054 };
4055 compound.push_operation(operator, quantifier, right_plan);
4056
4057 let result_plan = SelectPlan::new("").with_compound(compound);
4058
4059 Ok((result_plan, left_context))
4060 }
4061 other => Err(Error::InvalidArgumentError(format!(
4062 "unsupported query expression: {other:?}"
4063 ))),
4064 }
4065 }
4066
4067 fn translate_select_internal(
4068 &self,
4069 select: &Select,
4070 resolver: &IdentifierResolver<'_>,
4071 outer_scopes: &[IdentifierContext],
4072 subqueries: &mut Vec<llkv_plan::FilterSubquery>,
4073 mut correlated_tracker: Option<&mut SubqueryCorrelatedColumnTracker>,
4074 ) -> SqlResult<(SelectPlan, IdentifierContext)> {
4075 let mut distinct = match &select.distinct {
4076 None => false,
4077 Some(Distinct::Distinct) => true,
4078 Some(Distinct::On(_)) => {
4079 return Err(Error::InvalidArgumentError(
4080 "SELECT DISTINCT ON is not supported".into(),
4081 ));
4082 }
4083 };
4084 if matches!(
4085 select.value_table_mode,
4086 Some(
4087 sqlparser::ast::ValueTableMode::DistinctAsStruct
4088 | sqlparser::ast::ValueTableMode::DistinctAsValue
4089 )
4090 ) {
4091 distinct = true;
4092 }
4093 if select.top.is_some() {
4094 return Err(Error::InvalidArgumentError(
4095 "SELECT TOP is not supported".into(),
4096 ));
4097 }
4098 if select.exclude.is_some() {
4099 return Err(Error::InvalidArgumentError(
4100 "SELECT EXCLUDE is not supported".into(),
4101 ));
4102 }
4103 if select.into.is_some() {
4104 return Err(Error::InvalidArgumentError(
4105 "SELECT INTO is not supported".into(),
4106 ));
4107 }
4108 if !select.lateral_views.is_empty() {
4109 return Err(Error::InvalidArgumentError(
4110 "LATERAL VIEW is not supported".into(),
4111 ));
4112 }
4113 if select.prewhere.is_some() {
4114 return Err(Error::InvalidArgumentError(
4115 "PREWHERE is not supported".into(),
4116 ));
4117 }
4118 if !select.cluster_by.is_empty()
4119 || !select.distribute_by.is_empty()
4120 || !select.sort_by.is_empty()
4121 {
4122 return Err(Error::InvalidArgumentError(
4123 "CLUSTER/DISTRIBUTE/SORT BY clauses are not supported".into(),
4124 ));
4125 }
4126 if !select.named_window.is_empty()
4127 || select.qualify.is_some()
4128 || select.connect_by.is_some()
4129 {
4130 return Err(Error::InvalidArgumentError(
4131 "advanced SELECT clauses are not supported".into(),
4132 ));
4133 }
4134
4135 let table_alias = select
4136 .from
4137 .first()
4138 .and_then(|table_with_joins| match &table_with_joins.relation {
4139 TableFactor::Table { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
4140 _ => None,
4141 });
4142
4143 let has_joins = select
4144 .from
4145 .iter()
4146 .any(|table_with_joins| !table_with_joins.joins.is_empty());
4147 let mut join_conditions: Vec<SqlExpr> = Vec::new();
4148 let mut scalar_subqueries: Vec<llkv_plan::ScalarSubquery> = Vec::new();
4149 let catalog = self.engine.context().table_catalog();
4151 let has_group_by = !group_by_is_empty(&select.group_by);
4152 let (mut plan, id_context) = if select.from.is_empty() {
4153 if has_group_by {
4154 return Err(Error::InvalidArgumentError(
4155 "GROUP BY requires a FROM clause".into(),
4156 ));
4157 }
4158 let mut p = SelectPlan::new("");
4160 let projections = self.build_projection_list(
4161 resolver,
4162 IdentifierContext::new(None),
4163 &select.projection,
4164 outer_scopes,
4165 &mut scalar_subqueries,
4166 correlated_tracker.reborrow(),
4167 )?;
4168 p = p.with_projections(projections);
4169 (p, IdentifierContext::new(None))
4170 } else if select.from.len() == 1 && !has_joins {
4171 let (display_name, canonical_name) = extract_single_table(&select.from)?;
4173 let table_id = catalog.table_id(&canonical_name);
4174 let mut p = SelectPlan::new(display_name.clone());
4175 let single_table_context =
4176 IdentifierContext::new(table_id).with_table_alias(table_alias.clone());
4177 if let Some(alias) = table_alias.as_ref() {
4178 validate_projection_alias_qualifiers(&select.projection, alias)?;
4179 }
4180 if !has_group_by
4181 && let Some(aggregates) = self.detect_simple_aggregates(&select.projection)?
4182 {
4183 p = p.with_aggregates(aggregates);
4184 } else {
4185 let projections = self.build_projection_list(
4186 resolver,
4187 single_table_context.clone(),
4188 &select.projection,
4189 outer_scopes,
4190 &mut scalar_subqueries,
4191 correlated_tracker.reborrow(),
4192 )?;
4193 p = p.with_projections(projections);
4194 }
4195 (p, single_table_context)
4196 } else {
4197 let (tables, join_metadata, extracted_filters) = extract_tables(&select.from)?;
4199 join_conditions = extracted_filters;
4200 let mut p = SelectPlan::with_tables(tables).with_joins(join_metadata);
4201 let projections = self.build_projection_list(
4204 resolver,
4205 IdentifierContext::new(None),
4206 &select.projection,
4207 outer_scopes,
4208 &mut scalar_subqueries,
4209 correlated_tracker.reborrow(),
4210 )?;
4211 p = p.with_projections(projections);
4212 (p, IdentifierContext::new(None))
4213 };
4214
4215 let mut filter_components: Vec<llkv_expr::expr::Expr<'static, String>> = Vec::new();
4216 let mut all_subqueries = Vec::new();
4217
4218 if let Some(expr) = &select.selection {
4219 let materialized_expr = self.materialize_in_subquery(expr.clone())?;
4220 filter_components.push(translate_condition_with_context(
4221 self,
4222 resolver,
4223 id_context.clone(),
4224 &materialized_expr,
4225 outer_scopes,
4226 &mut all_subqueries,
4227 correlated_tracker.reborrow(),
4228 )?);
4229 }
4230
4231 for (idx, join_expr) in join_conditions.iter().enumerate() {
4238 let is_left_join_condition = plan
4240 .joins
4241 .get(idx)
4242 .map(|j| j.join_type == llkv_plan::JoinPlan::Left)
4243 .unwrap_or(false);
4244
4245 if !is_left_join_condition {
4246 let materialized_expr = self.materialize_in_subquery(join_expr.clone())?;
4248 filter_components.push(translate_condition_with_context(
4249 self,
4250 resolver,
4251 id_context.clone(),
4252 &materialized_expr,
4253 outer_scopes,
4254 &mut all_subqueries,
4255 correlated_tracker.reborrow(),
4256 )?);
4257 }
4258 }
4261
4262 let having_expr = if let Some(having) = &select.having {
4263 let materialized_expr = self.materialize_in_subquery(having.clone())?;
4264 let translated = translate_condition_with_context(
4265 self,
4266 resolver,
4267 id_context.clone(),
4268 &materialized_expr,
4269 outer_scopes,
4270 &mut all_subqueries,
4271 correlated_tracker.reborrow(),
4272 )?;
4273 Some(translated)
4274 } else {
4275 None
4276 };
4277
4278 subqueries.append(&mut all_subqueries);
4279
4280 let filter = match filter_components.len() {
4281 0 => None,
4282 1 if subqueries.is_empty() => Some(llkv_plan::SelectFilter {
4283 predicate: filter_components.into_iter().next().unwrap(),
4284 subqueries: Vec::new(),
4285 }),
4286 1 => Some(llkv_plan::SelectFilter {
4287 predicate: filter_components.into_iter().next().unwrap(),
4288 subqueries: std::mem::take(subqueries),
4289 }),
4290 _ => Some(llkv_plan::SelectFilter {
4291 predicate: llkv_expr::expr::Expr::And(filter_components),
4292 subqueries: std::mem::take(subqueries),
4293 }),
4294 };
4295 plan = plan.with_filter(filter);
4296 plan = plan.with_having(having_expr);
4297 plan = plan.with_scalar_subqueries(std::mem::take(&mut scalar_subqueries));
4298 plan = plan.with_distinct(distinct);
4299
4300 let group_by_columns = if has_group_by {
4301 self.translate_group_by_columns(resolver, id_context.clone(), &select.group_by)?
4302 } else {
4303 Vec::new()
4304 };
4305 plan = plan.with_group_by(group_by_columns);
4306
4307 let value_mode = select.value_table_mode.map(convert_value_table_mode);
4308 plan = plan.with_value_table_mode(value_mode);
4309 Ok((plan, id_context))
4310 }
4311
4312 fn translate_order_by(
4313 &self,
4314 resolver: &IdentifierResolver<'_>,
4315 id_context: IdentifierContext,
4316 order_by: &OrderBy,
4317 ) -> SqlResult<Vec<OrderByPlan>> {
4318 let exprs = match &order_by.kind {
4319 OrderByKind::Expressions(exprs) => exprs,
4320 _ => {
4321 return Err(Error::InvalidArgumentError(
4322 "unsupported ORDER BY clause".into(),
4323 ));
4324 }
4325 };
4326
4327 let base_nulls_first = self.default_nulls_first.load(AtomicOrdering::Relaxed);
4328
4329 let mut plans = Vec::with_capacity(exprs.len());
4330 for order_expr in exprs {
4331 let ascending = order_expr.options.asc.unwrap_or(true);
4332 let default_nulls_first_for_direction = if ascending {
4333 base_nulls_first
4334 } else {
4335 !base_nulls_first
4336 };
4337 let nulls_first = order_expr
4338 .options
4339 .nulls_first
4340 .unwrap_or(default_nulls_first_for_direction);
4341
4342 if let SqlExpr::Identifier(ident) = &order_expr.expr
4343 && ident.value.eq_ignore_ascii_case("ALL")
4344 && ident.quote_style.is_none()
4345 {
4346 plans.push(OrderByPlan {
4347 target: OrderTarget::All,
4348 sort_type: OrderSortType::Native,
4349 ascending,
4350 nulls_first,
4351 });
4352 continue;
4353 }
4354
4355 let (target, sort_type) = match &order_expr.expr {
4356 SqlExpr::Identifier(_) | SqlExpr::CompoundIdentifier(_) => (
4357 OrderTarget::Column(self.resolve_simple_column_expr(
4358 resolver,
4359 id_context.clone(),
4360 &order_expr.expr,
4361 )?),
4362 OrderSortType::Native,
4363 ),
4364 SqlExpr::Cast {
4365 expr,
4366 data_type:
4367 SqlDataType::Int(_)
4368 | SqlDataType::Integer(_)
4369 | SqlDataType::BigInt(_)
4370 | SqlDataType::SmallInt(_)
4371 | SqlDataType::TinyInt(_),
4372 ..
4373 } => (
4374 OrderTarget::Column(self.resolve_simple_column_expr(
4375 resolver,
4376 id_context.clone(),
4377 expr,
4378 )?),
4379 OrderSortType::CastTextToInteger,
4380 ),
4381 SqlExpr::Cast { data_type, .. } => {
4382 return Err(Error::InvalidArgumentError(format!(
4383 "ORDER BY CAST target type {:?} is not supported",
4384 data_type
4385 )));
4386 }
4387 SqlExpr::Value(value_with_span) => match &value_with_span.value {
4388 Value::Number(raw, _) => {
4389 let position: usize = raw.parse().map_err(|_| {
4390 Error::InvalidArgumentError(format!(
4391 "ORDER BY position '{}' is not a valid positive integer",
4392 raw
4393 ))
4394 })?;
4395 if position == 0 {
4396 return Err(Error::InvalidArgumentError(
4397 "ORDER BY position must be at least 1".into(),
4398 ));
4399 }
4400 (OrderTarget::Index(position - 1), OrderSortType::Native)
4401 }
4402 other => {
4403 return Err(Error::InvalidArgumentError(format!(
4404 "unsupported ORDER BY literal expression: {other:?}"
4405 )));
4406 }
4407 },
4408 other => {
4409 return Err(Error::InvalidArgumentError(format!(
4410 "unsupported ORDER BY expression: {other:?}"
4411 )));
4412 }
4413 };
4414
4415 plans.push(OrderByPlan {
4416 target,
4417 sort_type,
4418 ascending,
4419 nulls_first,
4420 });
4421 }
4422
4423 Ok(plans)
4424 }
4425
4426 fn translate_group_by_columns(
4427 &self,
4428 resolver: &IdentifierResolver<'_>,
4429 id_context: IdentifierContext,
4430 group_by: &GroupByExpr,
4431 ) -> SqlResult<Vec<String>> {
4432 use sqlparser::ast::Expr as SqlExpr;
4433
4434 match group_by {
4435 GroupByExpr::All(_) => Err(Error::InvalidArgumentError(
4436 "GROUP BY ALL is not supported".into(),
4437 )),
4438 GroupByExpr::Expressions(exprs, modifiers) => {
4439 if !modifiers.is_empty() {
4440 return Err(Error::InvalidArgumentError(
4441 "GROUP BY modifiers are not supported".into(),
4442 ));
4443 }
4444 let mut columns = Vec::with_capacity(exprs.len());
4445 for expr in exprs {
4446 let parts: Vec<String> = match expr {
4447 SqlExpr::Identifier(ident) => vec![ident.value.clone()],
4448 SqlExpr::CompoundIdentifier(idents) => {
4449 idents.iter().map(|id| id.value.clone()).collect()
4450 }
4451 _ => {
4452 return Err(Error::InvalidArgumentError(
4453 "GROUP BY expressions must be simple column references".into(),
4454 ));
4455 }
4456 };
4457 let resolution = resolver.resolve(&parts, id_context.clone())?;
4458 if !resolution.is_simple() {
4459 return Err(Error::InvalidArgumentError(
4460 "GROUP BY nested field references are not supported".into(),
4461 ));
4462 }
4463 columns.push(resolution.column().to_string());
4464 }
4465 Ok(columns)
4466 }
4467 }
4468 }
4469
4470 fn resolve_simple_column_expr(
4471 &self,
4472 resolver: &IdentifierResolver<'_>,
4473 context: IdentifierContext,
4474 expr: &SqlExpr,
4475 ) -> SqlResult<String> {
4476 let normalized_expr = self.materialize_in_subquery(expr.clone())?;
4477 let scalar = translate_scalar_with_context(resolver, context, &normalized_expr)?;
4478 match scalar {
4479 llkv_expr::expr::ScalarExpr::Column(column) => Ok(column),
4480 other => Err(Error::InvalidArgumentError(format!(
4481 "ORDER BY expression must reference a simple column, found {other:?}"
4482 ))),
4483 }
4484 }
4485
4486 fn detect_simple_aggregates(
4487 &self,
4488 projection_items: &[SelectItem],
4489 ) -> SqlResult<Option<Vec<AggregateExpr>>> {
4490 if projection_items.is_empty() {
4491 return Ok(None);
4492 }
4493
4494 let mut specs: Vec<AggregateExpr> = Vec::with_capacity(projection_items.len());
4495 for (idx, item) in projection_items.iter().enumerate() {
4496 let (expr, alias_opt) = match item {
4497 SelectItem::UnnamedExpr(expr) => (expr, None),
4498 SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())),
4499 _ => return Ok(None),
4500 };
4501
4502 let alias = alias_opt.unwrap_or_else(|| format!("col{}", idx + 1));
4503 let SqlExpr::Function(func) = expr else {
4504 return Ok(None);
4505 };
4506
4507 if func.uses_odbc_syntax {
4508 return Err(Error::InvalidArgumentError(
4509 "ODBC function syntax is not supported in aggregate queries".into(),
4510 ));
4511 }
4512 if !matches!(func.parameters, FunctionArguments::None) {
4513 return Err(Error::InvalidArgumentError(
4514 "parameterized aggregate functions are not supported".into(),
4515 ));
4516 }
4517 if func.filter.is_some()
4518 || func.null_treatment.is_some()
4519 || func.over.is_some()
4520 || !func.within_group.is_empty()
4521 {
4522 return Err(Error::InvalidArgumentError(
4523 "advanced aggregate clauses are not supported".into(),
4524 ));
4525 }
4526
4527 let mut is_distinct = false;
4528 let args_slice: &[FunctionArg] = match &func.args {
4529 FunctionArguments::List(list) => {
4530 if let Some(dup) = &list.duplicate_treatment {
4531 use sqlparser::ast::DuplicateTreatment;
4532 match dup {
4533 DuplicateTreatment::All => {}
4534 DuplicateTreatment::Distinct => is_distinct = true,
4535 }
4536 }
4537 if !list.clauses.is_empty() {
4538 return Err(Error::InvalidArgumentError(
4539 "aggregate argument clauses are not supported".into(),
4540 ));
4541 }
4542 &list.args
4543 }
4544 FunctionArguments::None => &[],
4545 FunctionArguments::Subquery(_) => {
4546 return Err(Error::InvalidArgumentError(
4547 "aggregate subquery arguments are not supported".into(),
4548 ));
4549 }
4550 };
4551
4552 let func_name = if func.name.0.len() == 1 {
4553 match &func.name.0[0] {
4554 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
4555 _ => {
4556 return Err(Error::InvalidArgumentError(
4557 "unsupported aggregate function name".into(),
4558 ));
4559 }
4560 }
4561 } else {
4562 return Err(Error::InvalidArgumentError(
4563 "qualified aggregate function names are not supported".into(),
4564 ));
4565 };
4566
4567 let aggregate = match func_name.as_str() {
4568 "count" => {
4569 if args_slice.len() != 1 {
4570 return Err(Error::InvalidArgumentError(
4571 "COUNT accepts exactly one argument".into(),
4572 ));
4573 }
4574 match &args_slice[0] {
4575 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
4576 if is_distinct {
4577 return Err(Error::InvalidArgumentError(
4578 "COUNT(DISTINCT *) is not supported".into(),
4579 ));
4580 }
4581 AggregateExpr::count_star(alias)
4582 }
4583 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
4584 let column = resolve_column_name(arg_expr)?;
4585 if is_distinct {
4586 AggregateExpr::count_distinct_column(column, alias)
4587 } else {
4588 AggregateExpr::count_column(column, alias)
4589 }
4590 }
4591 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
4592 return Err(Error::InvalidArgumentError(
4593 "named COUNT arguments are not supported".into(),
4594 ));
4595 }
4596 FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
4597 return Err(Error::InvalidArgumentError(
4598 "COUNT does not support qualified wildcards".into(),
4599 ));
4600 }
4601 }
4602 }
4603 "sum" | "min" | "max" => {
4604 if is_distinct {
4605 return Err(Error::InvalidArgumentError(
4606 "DISTINCT is not supported for this aggregate".into(),
4607 ));
4608 }
4609 if args_slice.len() != 1 {
4610 return Err(Error::InvalidArgumentError(format!(
4611 "{} accepts exactly one argument",
4612 func_name.to_uppercase()
4613 )));
4614 }
4615 let arg_expr = match &args_slice[0] {
4616 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => arg_expr,
4617 FunctionArg::Unnamed(FunctionArgExpr::Wildcard)
4618 | FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
4619 return Err(Error::InvalidArgumentError(format!(
4620 "{} does not support wildcard arguments",
4621 func_name.to_uppercase()
4622 )));
4623 }
4624 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
4625 return Err(Error::InvalidArgumentError(format!(
4626 "{} arguments must be column references",
4627 func_name.to_uppercase()
4628 )));
4629 }
4630 };
4631
4632 if func_name == "sum" {
4633 if let Some(column) = parse_count_nulls_case(arg_expr)? {
4634 AggregateExpr::count_nulls(column, alias)
4635 } else {
4636 let column = resolve_column_name(arg_expr)?;
4637 AggregateExpr::sum_int64(column, alias)
4638 }
4639 } else {
4640 let column = resolve_column_name(arg_expr)?;
4641 if func_name == "min" {
4642 AggregateExpr::min_int64(column, alias)
4643 } else {
4644 AggregateExpr::max_int64(column, alias)
4645 }
4646 }
4647 }
4648 _ => return Ok(None),
4649 };
4650
4651 specs.push(aggregate);
4652 }
4653
4654 if specs.is_empty() {
4655 return Ok(None);
4656 }
4657 Ok(Some(specs))
4658 }
4659
4660 fn build_projection_list(
4661 &self,
4662 resolver: &IdentifierResolver<'_>,
4663 id_context: IdentifierContext,
4664 projection_items: &[SelectItem],
4665 outer_scopes: &[IdentifierContext],
4666 scalar_subqueries: &mut Vec<llkv_plan::ScalarSubquery>,
4667 mut correlated_tracker: Option<&mut SubqueryCorrelatedColumnTracker>,
4668 ) -> SqlResult<Vec<SelectProjection>> {
4669 if projection_items.is_empty() {
4670 return Err(Error::InvalidArgumentError(
4671 "SELECT projection must include at least one column".into(),
4672 ));
4673 }
4674
4675 let mut projections = Vec::with_capacity(projection_items.len());
4676 for (idx, item) in projection_items.iter().enumerate() {
4677 match item {
4678 SelectItem::Wildcard(options) => {
4679 if let Some(exclude) = &options.opt_exclude {
4680 use sqlparser::ast::ExcludeSelectItem;
4681 let exclude_cols = match exclude {
4682 ExcludeSelectItem::Single(ident) => vec![ident.value.clone()],
4683 ExcludeSelectItem::Multiple(idents) => {
4684 idents.iter().map(|id| id.value.clone()).collect()
4685 }
4686 };
4687 projections.push(SelectProjection::AllColumnsExcept {
4688 exclude: exclude_cols,
4689 });
4690 } else {
4691 projections.push(SelectProjection::AllColumns);
4692 }
4693 }
4694 SelectItem::QualifiedWildcard(kind, _) => match kind {
4695 SelectItemQualifiedWildcardKind::ObjectName(name) => {
4696 projections.push(SelectProjection::Column {
4697 name: name.to_string(),
4698 alias: None,
4699 });
4700 }
4701 SelectItemQualifiedWildcardKind::Expr(_) => {
4702 return Err(Error::InvalidArgumentError(
4703 "expression-qualified wildcards are not supported".into(),
4704 ));
4705 }
4706 },
4707 SelectItem::UnnamedExpr(expr) => match expr {
4708 SqlExpr::Identifier(ident) => {
4709 let parts = vec![ident.value.clone()];
4710 let resolution = resolver.resolve(&parts, id_context.clone())?;
4711 if resolution.is_simple() {
4712 projections.push(SelectProjection::Column {
4713 name: resolution.column().to_string(),
4714 alias: None,
4715 });
4716 } else {
4717 let alias = format!("col{}", idx + 1);
4718 projections.push(SelectProjection::Computed {
4719 expr: resolution.into_scalar_expr(),
4720 alias,
4721 });
4722 }
4723 }
4724 SqlExpr::CompoundIdentifier(parts) => {
4725 let name_parts: Vec<String> =
4726 parts.iter().map(|part| part.value.clone()).collect();
4727 let resolution = resolver.resolve(&name_parts, id_context.clone())?;
4728 if resolution.is_simple() {
4729 projections.push(SelectProjection::Column {
4730 name: resolution.column().to_string(),
4731 alias: None,
4732 });
4733 } else {
4734 let alias = format!("col{}", idx + 1);
4735 projections.push(SelectProjection::Computed {
4736 expr: resolution.into_scalar_expr(),
4737 alias,
4738 });
4739 }
4740 }
4741 _ => {
4742 let alias = format!("col{}", idx + 1);
4743 let normalized_expr = if matches!(expr, SqlExpr::Subquery(_)) {
4744 expr.clone()
4745 } else {
4746 self.materialize_in_subquery(expr.clone())?
4747 };
4748 let scalar = {
4749 let tracker_view = correlated_tracker.reborrow();
4750 let mut builder = ScalarSubqueryPlanner {
4751 engine: self,
4752 scalar_subqueries,
4753 };
4754 let mut tracker_wrapper =
4755 SubqueryCorrelatedTracker::from_option(tracker_view);
4756 translate_scalar_internal(
4757 &normalized_expr,
4758 Some(resolver),
4759 Some(&id_context),
4760 outer_scopes,
4761 &mut tracker_wrapper,
4762 Some(&mut builder),
4763 )?
4764 };
4765 projections.push(SelectProjection::Computed {
4766 expr: scalar,
4767 alias,
4768 });
4769 }
4770 },
4771 SelectItem::ExprWithAlias { expr, alias } => match expr {
4772 SqlExpr::Identifier(ident) => {
4773 let parts = vec![ident.value.clone()];
4774 let resolution = resolver.resolve(&parts, id_context.clone())?;
4775 if resolution.is_simple() {
4776 projections.push(SelectProjection::Column {
4777 name: resolution.column().to_string(),
4778 alias: Some(alias.value.clone()),
4779 });
4780 } else {
4781 projections.push(SelectProjection::Computed {
4782 expr: resolution.into_scalar_expr(),
4783 alias: alias.value.clone(),
4784 });
4785 }
4786 }
4787 SqlExpr::CompoundIdentifier(parts) => {
4788 let name_parts: Vec<String> =
4789 parts.iter().map(|part| part.value.clone()).collect();
4790 let resolution = resolver.resolve(&name_parts, id_context.clone())?;
4791 if resolution.is_simple() {
4792 projections.push(SelectProjection::Column {
4793 name: resolution.column().to_string(),
4794 alias: Some(alias.value.clone()),
4795 });
4796 } else {
4797 projections.push(SelectProjection::Computed {
4798 expr: resolution.into_scalar_expr(),
4799 alias: alias.value.clone(),
4800 });
4801 }
4802 }
4803 _ => {
4804 let normalized_expr = if matches!(expr, SqlExpr::Subquery(_)) {
4805 expr.clone()
4806 } else {
4807 self.materialize_in_subquery(expr.clone())?
4808 };
4809 let scalar = {
4810 let tracker_view = correlated_tracker.reborrow();
4811 let mut builder = ScalarSubqueryPlanner {
4812 engine: self,
4813 scalar_subqueries,
4814 };
4815 let mut tracker_wrapper =
4816 SubqueryCorrelatedTracker::from_option(tracker_view);
4817 translate_scalar_internal(
4818 &normalized_expr,
4819 Some(resolver),
4820 Some(&id_context),
4821 outer_scopes,
4822 &mut tracker_wrapper,
4823 Some(&mut builder),
4824 )?
4825 };
4826 projections.push(SelectProjection::Computed {
4827 expr: scalar,
4828 alias: alias.value.clone(),
4829 });
4830 }
4831 },
4832 }
4833 }
4834 Ok(projections)
4835 }
4836
4837 #[allow(clippy::too_many_arguments)] fn handle_start_transaction(
4839 &self,
4840 modes: Vec<TransactionMode>,
4841 begin: bool,
4842 transaction: Option<BeginTransactionKind>,
4843 modifier: Option<TransactionModifier>,
4844 statements: Vec<Statement>,
4845 exception: Option<Vec<ExceptionWhen>>,
4846 has_end_keyword: bool,
4847 ) -> SqlResult<RuntimeStatementResult<P>> {
4848 if !modes.is_empty() {
4849 return Err(Error::InvalidArgumentError(
4850 "transaction modes are not supported".into(),
4851 ));
4852 }
4853 if modifier.is_some() {
4854 return Err(Error::InvalidArgumentError(
4855 "transaction modifiers are not supported".into(),
4856 ));
4857 }
4858 if !statements.is_empty() || exception.is_some() || has_end_keyword {
4859 return Err(Error::InvalidArgumentError(
4860 "BEGIN blocks with inline statements or exceptions are not supported".into(),
4861 ));
4862 }
4863 if let Some(kind) = transaction {
4864 match kind {
4865 BeginTransactionKind::Transaction | BeginTransactionKind::Work => {}
4866 }
4867 }
4868 if !begin {
4869 tracing::warn!("Currently treat `START TRANSACTION` same as `BEGIN`")
4871 }
4872
4873 self.execute_plan_statement(PlanStatement::BeginTransaction)
4874 }
4875
4876 fn handle_commit(
4877 &self,
4878 chain: bool,
4879 end: bool,
4880 modifier: Option<TransactionModifier>,
4881 ) -> SqlResult<RuntimeStatementResult<P>> {
4882 if chain {
4883 return Err(Error::InvalidArgumentError(
4884 "COMMIT AND [NO] CHAIN is not supported".into(),
4885 ));
4886 }
4887 if end {
4888 return Err(Error::InvalidArgumentError(
4889 "END blocks are not supported".into(),
4890 ));
4891 }
4892 if modifier.is_some() {
4893 return Err(Error::InvalidArgumentError(
4894 "transaction modifiers are not supported".into(),
4895 ));
4896 }
4897
4898 self.execute_plan_statement(PlanStatement::CommitTransaction)
4899 }
4900
4901 fn handle_rollback(
4902 &self,
4903 chain: bool,
4904 savepoint: Option<Ident>,
4905 ) -> SqlResult<RuntimeStatementResult<P>> {
4906 if chain {
4907 return Err(Error::InvalidArgumentError(
4908 "ROLLBACK AND [NO] CHAIN is not supported".into(),
4909 ));
4910 }
4911 if savepoint.is_some() {
4912 return Err(Error::InvalidArgumentError(
4913 "ROLLBACK TO SAVEPOINT is not supported".into(),
4914 ));
4915 }
4916
4917 self.execute_plan_statement(PlanStatement::RollbackTransaction)
4918 }
4919
4920 fn handle_set(&self, set_stmt: Set) -> SqlResult<RuntimeStatementResult<P>> {
4921 match set_stmt {
4922 Set::SingleAssignment {
4923 scope,
4924 hivevar,
4925 variable,
4926 values,
4927 } => {
4928 if scope.is_some() || hivevar {
4929 return Err(Error::InvalidArgumentError(
4930 "SET modifiers are not supported".into(),
4931 ));
4932 }
4933
4934 let variable_name_raw = variable.to_string();
4935 let variable_name = variable_name_raw.to_ascii_lowercase();
4936
4937 match variable_name.as_str() {
4938 "default_null_order" => {
4939 if values.len() != 1 {
4940 return Err(Error::InvalidArgumentError(
4941 "SET default_null_order expects exactly one value".into(),
4942 ));
4943 }
4944
4945 let value_expr = &values[0];
4946 let normalized = match value_expr {
4947 SqlExpr::Value(value_with_span) => value_with_span
4948 .value
4949 .clone()
4950 .into_string()
4951 .map(|s| s.to_ascii_lowercase()),
4952 SqlExpr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
4953 _ => None,
4954 };
4955
4956 if !matches!(normalized.as_deref(), Some("nulls_first" | "nulls_last")) {
4957 return Err(Error::InvalidArgumentError(format!(
4958 "unsupported value for SET default_null_order: {value_expr:?}"
4959 )));
4960 }
4961
4962 let use_nulls_first = matches!(normalized.as_deref(), Some("nulls_first"));
4963 self.default_nulls_first
4964 .store(use_nulls_first, AtomicOrdering::Relaxed);
4965
4966 Ok(RuntimeStatementResult::NoOp)
4967 }
4968 "immediate_transaction_mode" => {
4969 if values.len() != 1 {
4970 return Err(Error::InvalidArgumentError(
4971 "SET immediate_transaction_mode expects exactly one value".into(),
4972 ));
4973 }
4974 let normalized = values[0].to_string().to_ascii_lowercase();
4975 let enabled = match normalized.as_str() {
4976 "true" | "on" | "1" => true,
4977 "false" | "off" | "0" => false,
4978 _ => {
4979 return Err(Error::InvalidArgumentError(format!(
4980 "unsupported value for SET immediate_transaction_mode: {}",
4981 values[0]
4982 )));
4983 }
4984 };
4985 if !enabled {
4986 tracing::warn!(
4987 "SET immediate_transaction_mode=false has no effect; continuing with auto mode"
4988 );
4989 }
4990 Ok(RuntimeStatementResult::NoOp)
4991 }
4992 _ => Err(Error::InvalidArgumentError(format!(
4993 "unsupported SET variable: {variable_name_raw}"
4994 ))),
4995 }
4996 }
4997 other => Err(Error::InvalidArgumentError(format!(
4998 "unsupported SQL SET statement: {other:?}",
4999 ))),
5000 }
5001 }
5002
5003 fn handle_pragma(
5004 &self,
5005 name: ObjectName,
5006 value: Option<Value>,
5007 is_eq: bool,
5008 ) -> SqlResult<RuntimeStatementResult<P>> {
5009 let (display, canonical) = canonical_object_name(&name)?;
5010 if value.is_some() || is_eq {
5011 return Err(Error::InvalidArgumentError(format!(
5012 "PRAGMA '{display}' does not accept a value"
5013 )));
5014 }
5015
5016 match canonical.as_str() {
5017 "enable_verification" | "disable_verification" => Ok(RuntimeStatementResult::NoOp),
5018 _ => Err(Error::InvalidArgumentError(format!(
5019 "unsupported PRAGMA '{}'",
5020 display
5021 ))),
5022 }
5023 }
5024}
5025
5026fn canonical_object_name(name: &ObjectName) -> SqlResult<(String, String)> {
5027 if name.0.is_empty() {
5028 return Err(Error::InvalidArgumentError(
5029 "object name must not be empty".into(),
5030 ));
5031 }
5032 let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
5033 for part in &name.0 {
5034 let ident = match part {
5035 ObjectNamePart::Identifier(ident) => ident,
5036 _ => {
5037 return Err(Error::InvalidArgumentError(
5038 "object names using functions are not supported".into(),
5039 ));
5040 }
5041 };
5042 parts.push(ident.value.clone());
5043 }
5044 let display = parts.join(".");
5045 let canonical = display.to_ascii_lowercase();
5046 Ok((display, canonical))
5047}
5048
5049fn parse_schema_qualified_name(name: &ObjectName) -> SqlResult<(Option<String>, String)> {
5058 if name.0.is_empty() {
5059 return Err(Error::InvalidArgumentError(
5060 "object name must not be empty".into(),
5061 ));
5062 }
5063
5064 let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
5065 for part in &name.0 {
5066 let ident = match part {
5067 ObjectNamePart::Identifier(ident) => ident,
5068 _ => {
5069 return Err(Error::InvalidArgumentError(
5070 "object names using functions are not supported".into(),
5071 ));
5072 }
5073 };
5074 parts.push(ident.value.clone());
5075 }
5076
5077 match parts.len() {
5078 1 => Ok((None, parts[0].clone())),
5079 2 => Ok((Some(parts[0].clone()), parts[1].clone())),
5080 _ => Err(Error::InvalidArgumentError(format!(
5081 "table name has too many parts: {}",
5082 name
5083 ))),
5084 }
5085}
5086
5087fn extract_index_column_name(
5101 index_col: &sqlparser::ast::IndexColumn,
5102 context: &str,
5103 allow_sort_options: bool,
5104 allow_compound: bool,
5105) -> SqlResult<String> {
5106 use sqlparser::ast::Expr as SqlExpr;
5107
5108 if index_col.operator_class.is_some() {
5110 return Err(Error::InvalidArgumentError(format!(
5111 "{} operator classes are not supported",
5112 context
5113 )));
5114 }
5115
5116 let order_expr = &index_col.column;
5117
5118 if allow_sort_options {
5120 let _ascending = order_expr.options.asc.unwrap_or(true);
5122 let _nulls_first = order_expr.options.nulls_first.unwrap_or(false);
5123 } else {
5125 if order_expr.options.asc.is_some()
5127 || order_expr.options.nulls_first.is_some()
5128 || order_expr.with_fill.is_some()
5129 {
5130 return Err(Error::InvalidArgumentError(format!(
5131 "{} columns must be simple identifiers",
5132 context
5133 )));
5134 }
5135 }
5136
5137 let column_name = match &order_expr.expr {
5139 SqlExpr::Identifier(ident) => ident.value.clone(),
5140 SqlExpr::CompoundIdentifier(parts) => {
5141 if allow_compound {
5142 parts
5144 .last()
5145 .map(|ident| ident.value.clone())
5146 .ok_or_else(|| {
5147 Error::InvalidArgumentError(format!(
5148 "invalid column reference in {}",
5149 context
5150 ))
5151 })?
5152 } else if parts.len() == 1 {
5153 parts[0].value.clone()
5155 } else {
5156 return Err(Error::InvalidArgumentError(format!(
5157 "{} columns must be column identifiers",
5158 context
5159 )));
5160 }
5161 }
5162 other => {
5163 return Err(Error::InvalidArgumentError(format!(
5164 "{} only supports column references, found {:?}",
5165 context, other
5166 )));
5167 }
5168 };
5169
5170 Ok(column_name)
5171}
5172
5173fn validate_create_table_common(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
5174 if stmt.clone.is_some() || stmt.like.is_some() {
5175 return Err(Error::InvalidArgumentError(
5176 "CREATE TABLE LIKE/CLONE is not supported".into(),
5177 ));
5178 }
5179 if stmt.or_replace && stmt.if_not_exists {
5180 return Err(Error::InvalidArgumentError(
5181 "CREATE TABLE cannot combine OR REPLACE with IF NOT EXISTS".into(),
5182 ));
5183 }
5184 use sqlparser::ast::TableConstraint;
5185
5186 let mut seen_primary_key = false;
5187 for constraint in &stmt.constraints {
5188 match constraint {
5189 TableConstraint::PrimaryKey { .. } => {
5190 if seen_primary_key {
5191 return Err(Error::InvalidArgumentError(
5192 "multiple PRIMARY KEY constraints are not supported".into(),
5193 ));
5194 }
5195 seen_primary_key = true;
5196 }
5197 TableConstraint::Unique { .. } => {
5198 }
5200 TableConstraint::ForeignKey { .. } => {
5201 }
5203 other => {
5204 return Err(Error::InvalidArgumentError(format!(
5205 "table-level constraint {:?} is not supported",
5206 other
5207 )));
5208 }
5209 }
5210 }
5211 Ok(())
5212}
5213
5214fn validate_check_constraint(
5215 check_expr: &sqlparser::ast::Expr,
5216 table_name: &str,
5217 column_names: &[&str],
5218) -> SqlResult<()> {
5219 use sqlparser::ast::Expr as SqlExpr;
5220
5221 let column_names_lower: HashSet<String> = column_names
5222 .iter()
5223 .map(|name| name.to_ascii_lowercase())
5224 .collect();
5225
5226 let mut stack: Vec<&SqlExpr> = vec![check_expr];
5227
5228 while let Some(expr) = stack.pop() {
5229 match expr {
5230 SqlExpr::Subquery(_) => {
5231 return Err(Error::InvalidArgumentError(
5232 "Subqueries are not allowed in CHECK constraints".into(),
5233 ));
5234 }
5235 SqlExpr::Function(func) => {
5236 let func_name = func.name.to_string().to_uppercase();
5237 if matches!(func_name.as_str(), "SUM" | "AVG" | "COUNT" | "MIN" | "MAX") {
5238 return Err(Error::InvalidArgumentError(
5239 "Aggregate functions are not allowed in CHECK constraints".into(),
5240 ));
5241 }
5242
5243 if let sqlparser::ast::FunctionArguments::List(list) = &func.args {
5244 for arg in &list.args {
5245 if let sqlparser::ast::FunctionArg::Unnamed(
5246 sqlparser::ast::FunctionArgExpr::Expr(expr),
5247 ) = arg
5248 {
5249 stack.push(expr);
5250 }
5251 }
5252 }
5253 }
5254 SqlExpr::Identifier(ident) => {
5255 if !column_names_lower.contains(&ident.value.to_ascii_lowercase()) {
5256 return Err(Error::InvalidArgumentError(format!(
5257 "Column '{}' referenced in CHECK constraint does not exist",
5258 ident.value
5259 )));
5260 }
5261 }
5262 SqlExpr::CompoundIdentifier(idents) => {
5263 if idents.len() == 2 {
5264 let first = idents[0].value.as_str();
5265 let second = &idents[1].value;
5266
5267 if column_names_lower.contains(&first.to_ascii_lowercase()) {
5268 continue;
5269 }
5270
5271 if !first.eq_ignore_ascii_case(table_name) {
5272 return Err(Error::InvalidArgumentError(format!(
5273 "CHECK constraint references column from different table '{}'",
5274 first
5275 )));
5276 }
5277
5278 if !column_names_lower.contains(&second.to_ascii_lowercase()) {
5279 return Err(Error::InvalidArgumentError(format!(
5280 "Column '{}' referenced in CHECK constraint does not exist",
5281 second
5282 )));
5283 }
5284 } else if idents.len() == 3 {
5285 let first = &idents[0].value;
5286 let second = &idents[1].value;
5287 let third = &idents[2].value;
5288
5289 if first.eq_ignore_ascii_case(table_name) {
5290 if !column_names_lower.contains(&second.to_ascii_lowercase()) {
5291 return Err(Error::InvalidArgumentError(format!(
5292 "Column '{}' referenced in CHECK constraint does not exist",
5293 second
5294 )));
5295 }
5296 } else if second.eq_ignore_ascii_case(table_name) {
5297 if !column_names_lower.contains(&third.to_ascii_lowercase()) {
5298 return Err(Error::InvalidArgumentError(format!(
5299 "Column '{}' referenced in CHECK constraint does not exist",
5300 third
5301 )));
5302 }
5303 } else {
5304 return Err(Error::InvalidArgumentError(format!(
5305 "CHECK constraint references column from different table '{}'",
5306 second
5307 )));
5308 }
5309 }
5310 }
5311 SqlExpr::BinaryOp { left, right, .. } => {
5312 stack.push(left);
5313 stack.push(right);
5314 }
5315 SqlExpr::UnaryOp { expr, .. } | SqlExpr::Nested(expr) => {
5316 stack.push(expr);
5317 }
5318 SqlExpr::Value(_) | SqlExpr::TypedString { .. } => {}
5319 _ => {}
5320 }
5321 }
5322
5323 Ok(())
5324}
5325
5326fn validate_create_table_definition(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
5327 for column in &stmt.columns {
5328 for ColumnOptionDef { option, .. } in &column.options {
5329 match option {
5330 ColumnOption::Null
5331 | ColumnOption::NotNull
5332 | ColumnOption::Unique { .. }
5333 | ColumnOption::Check(_)
5334 | ColumnOption::ForeignKey { .. } => {}
5335 ColumnOption::Default(_) => {
5336 return Err(Error::InvalidArgumentError(format!(
5337 "DEFAULT values are not supported for column '{}'",
5338 column.name
5339 )));
5340 }
5341 other => {
5342 return Err(Error::InvalidArgumentError(format!(
5343 "unsupported column option {:?} on '{}'",
5344 other, column.name
5345 )));
5346 }
5347 }
5348 }
5349 }
5350 Ok(())
5351}
5352
5353fn validate_create_table_as(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
5354 if !stmt.columns.is_empty() {
5355 return Err(Error::InvalidArgumentError(
5356 "CREATE TABLE AS SELECT does not support column definitions yet".into(),
5357 ));
5358 }
5359 Ok(())
5360}
5361
5362fn validate_simple_query(query: &Query) -> SqlResult<()> {
5363 if query.with.is_some() {
5364 return Err(Error::InvalidArgumentError(
5365 "WITH clauses are not supported".into(),
5366 ));
5367 }
5368 if let Some(limit_clause) = &query.limit_clause {
5369 match limit_clause {
5370 LimitClause::LimitOffset {
5371 offset: Some(_), ..
5372 }
5373 | LimitClause::OffsetCommaLimit { .. } => {
5374 return Err(Error::InvalidArgumentError(
5375 "OFFSET clauses are not supported".into(),
5376 ));
5377 }
5378 LimitClause::LimitOffset { limit_by, .. } if !limit_by.is_empty() => {
5379 return Err(Error::InvalidArgumentError(
5380 "LIMIT BY clauses are not supported".into(),
5381 ));
5382 }
5383 _ => {}
5384 }
5385 }
5386 if query.fetch.is_some() {
5387 return Err(Error::InvalidArgumentError(
5388 "FETCH clauses are not supported".into(),
5389 ));
5390 }
5391 Ok(())
5392}
5393
5394fn resolve_column_name(expr: &SqlExpr) -> SqlResult<String> {
5395 match expr {
5396 SqlExpr::Identifier(ident) => Ok(ident.value.clone()),
5397 SqlExpr::CompoundIdentifier(parts) => {
5398 if let Some(last) = parts.last() {
5399 Ok(last.value.clone())
5400 } else {
5401 Err(Error::InvalidArgumentError(
5402 "empty column identifier".into(),
5403 ))
5404 }
5405 }
5406 SqlExpr::UnaryOp {
5408 op: UnaryOperator::Plus | UnaryOperator::Minus,
5409 expr,
5410 } => resolve_column_name(expr),
5411 _ => Err(Error::InvalidArgumentError(
5412 "aggregate arguments must be plain column identifiers".into(),
5413 )),
5414 }
5415}
5416
5417fn validate_projection_alias_qualifiers(
5418 projection_items: &[SelectItem],
5419 alias: &str,
5420) -> SqlResult<()> {
5421 let alias_lower = alias.to_ascii_lowercase();
5422 for item in projection_items {
5423 match item {
5424 SelectItem::UnnamedExpr(expr) | SelectItem::ExprWithAlias { expr, .. } => {
5425 if let SqlExpr::CompoundIdentifier(parts) = expr
5426 && parts.len() >= 2
5427 && let Some(first) = parts.first()
5428 && !first.value.eq_ignore_ascii_case(&alias_lower)
5429 {
5430 return Err(Error::InvalidArgumentError(format!(
5431 "Binder Error: table '{}' not found",
5432 first.value
5433 )));
5434 }
5435 }
5436 _ => {}
5437 }
5438 }
5439 Ok(())
5440}
5441
5442#[allow(dead_code)] fn expr_contains_aggregate(expr: &llkv_expr::expr::ScalarExpr<String>) -> bool {
5446 match expr {
5447 llkv_expr::expr::ScalarExpr::Aggregate(_) => true,
5448 llkv_expr::expr::ScalarExpr::Binary { left, right, .. } => {
5449 expr_contains_aggregate(left) || expr_contains_aggregate(right)
5450 }
5451 llkv_expr::expr::ScalarExpr::Compare { left, right, .. } => {
5452 expr_contains_aggregate(left) || expr_contains_aggregate(right)
5453 }
5454 llkv_expr::expr::ScalarExpr::Not(inner) => expr_contains_aggregate(inner),
5455 llkv_expr::expr::ScalarExpr::IsNull { expr, .. } => expr_contains_aggregate(expr),
5456 llkv_expr::expr::ScalarExpr::GetField { base, .. } => expr_contains_aggregate(base),
5457 llkv_expr::expr::ScalarExpr::Cast { expr, .. } => expr_contains_aggregate(expr),
5458 llkv_expr::expr::ScalarExpr::Case {
5459 operand,
5460 branches,
5461 else_expr,
5462 } => {
5463 operand
5464 .as_deref()
5465 .map(expr_contains_aggregate)
5466 .unwrap_or(false)
5467 || branches.iter().any(|(when_expr, then_expr)| {
5468 expr_contains_aggregate(when_expr) || expr_contains_aggregate(then_expr)
5469 })
5470 || else_expr
5471 .as_deref()
5472 .map(expr_contains_aggregate)
5473 .unwrap_or(false)
5474 }
5475 llkv_expr::expr::ScalarExpr::Coalesce(items) => items.iter().any(expr_contains_aggregate),
5476 llkv_expr::expr::ScalarExpr::Column(_) | llkv_expr::expr::ScalarExpr::Literal(_) => false,
5477 llkv_expr::expr::ScalarExpr::ScalarSubquery(_) => false,
5478 }
5479}
5480
5481fn try_parse_aggregate_function(
5482 func: &sqlparser::ast::Function,
5483 resolver: Option<&IdentifierResolver<'_>>,
5484 context: Option<&IdentifierContext>,
5485 outer_scopes: &[IdentifierContext],
5486 tracker: &mut SubqueryCorrelatedTracker<'_>,
5487) -> SqlResult<Option<llkv_expr::expr::AggregateCall<String>>> {
5488 use sqlparser::ast::{
5489 DuplicateTreatment, FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart,
5490 };
5491
5492 if func.uses_odbc_syntax {
5493 return Ok(None);
5494 }
5495 if !matches!(func.parameters, FunctionArguments::None) {
5496 return Ok(None);
5497 }
5498 if func.filter.is_some()
5499 || func.null_treatment.is_some()
5500 || func.over.is_some()
5501 || !func.within_group.is_empty()
5502 {
5503 return Ok(None);
5504 }
5505
5506 let func_name = if func.name.0.len() == 1 {
5507 match &func.name.0[0] {
5508 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
5509 _ => return Ok(None),
5510 }
5511 } else {
5512 return Ok(None);
5513 };
5514
5515 let distinct = match &func.args {
5517 FunctionArguments::List(list) => {
5518 if !list.clauses.is_empty() {
5519 return Ok(None);
5520 }
5521 matches!(list.duplicate_treatment, Some(DuplicateTreatment::Distinct))
5522 }
5523 _ => false,
5524 };
5525
5526 let args_slice: &[FunctionArg] = match &func.args {
5527 FunctionArguments::List(list) => &list.args,
5528 FunctionArguments::None => &[],
5529 FunctionArguments::Subquery(_) => return Ok(None),
5530 };
5531
5532 let agg_call = match func_name.as_str() {
5533 "count" => {
5534 if args_slice.len() != 1 {
5535 return Err(Error::InvalidArgumentError(
5536 "COUNT accepts exactly one argument".into(),
5537 ));
5538 }
5539 match &args_slice[0] {
5540 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
5541 if distinct {
5542 return Err(Error::InvalidArgumentError(
5543 "COUNT(DISTINCT *) is not supported".into(),
5544 ));
5545 }
5546 llkv_expr::expr::AggregateCall::CountStar
5547 }
5548 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
5549 let expr = translate_scalar_internal(
5550 arg_expr,
5551 resolver,
5552 context,
5553 outer_scopes,
5554 tracker,
5555 None,
5556 )?;
5557 llkv_expr::expr::AggregateCall::Count {
5558 expr: Box::new(expr),
5559 distinct,
5560 }
5561 }
5562 _ => {
5563 return Err(Error::InvalidArgumentError(
5564 "unsupported COUNT argument".into(),
5565 ));
5566 }
5567 }
5568 }
5569 "sum" => {
5570 if args_slice.len() != 1 {
5571 return Err(Error::InvalidArgumentError(
5572 "SUM accepts exactly one argument".into(),
5573 ));
5574 }
5575 let arg_expr = match &args_slice[0] {
5576 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
5577 _ => {
5578 return Err(Error::InvalidArgumentError(
5579 "SUM requires a column argument".into(),
5580 ));
5581 }
5582 };
5583
5584 if let Some(column) = parse_count_nulls_case(arg_expr)? {
5586 if distinct {
5587 return Err(Error::InvalidArgumentError(
5588 "DISTINCT not supported for COUNT(CASE ...) pattern".into(),
5589 ));
5590 }
5591 llkv_expr::expr::AggregateCall::CountNulls(Box::new(
5592 llkv_expr::expr::ScalarExpr::column(column),
5593 ))
5594 } else {
5595 let expr = translate_scalar_internal(
5596 arg_expr,
5597 resolver,
5598 context,
5599 outer_scopes,
5600 tracker,
5601 None,
5602 )?;
5603 llkv_expr::expr::AggregateCall::Sum {
5604 expr: Box::new(expr),
5605 distinct,
5606 }
5607 }
5608 }
5609 "min" => {
5610 if args_slice.len() != 1 {
5611 return Err(Error::InvalidArgumentError(
5612 "MIN accepts exactly one argument".into(),
5613 ));
5614 }
5615 let arg_expr = match &args_slice[0] {
5616 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
5617 _ => {
5618 return Err(Error::InvalidArgumentError(
5619 "MIN requires a column argument".into(),
5620 ));
5621 }
5622 };
5623 let expr = translate_scalar_internal(
5624 arg_expr,
5625 resolver,
5626 context,
5627 outer_scopes,
5628 tracker,
5629 None,
5630 )?;
5631 llkv_expr::expr::AggregateCall::Min(Box::new(expr))
5632 }
5633 "max" => {
5634 if args_slice.len() != 1 {
5635 return Err(Error::InvalidArgumentError(
5636 "MAX accepts exactly one argument".into(),
5637 ));
5638 }
5639 let arg_expr = match &args_slice[0] {
5640 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
5641 _ => {
5642 return Err(Error::InvalidArgumentError(
5643 "MAX requires a column argument".into(),
5644 ));
5645 }
5646 };
5647 let expr = translate_scalar_internal(
5648 arg_expr,
5649 resolver,
5650 context,
5651 outer_scopes,
5652 tracker,
5653 None,
5654 )?;
5655 llkv_expr::expr::AggregateCall::Max(Box::new(expr))
5656 }
5657 "avg" => {
5658 if args_slice.len() != 1 {
5659 return Err(Error::InvalidArgumentError(
5660 "AVG accepts exactly one argument".into(),
5661 ));
5662 }
5663 let arg_expr = match &args_slice[0] {
5664 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
5665 _ => {
5666 return Err(Error::InvalidArgumentError(
5667 "AVG requires a column argument".into(),
5668 ));
5669 }
5670 };
5671 let expr = translate_scalar_internal(
5672 arg_expr,
5673 resolver,
5674 context,
5675 outer_scopes,
5676 tracker,
5677 None,
5678 )?;
5679 llkv_expr::expr::AggregateCall::Avg {
5680 expr: Box::new(expr),
5681 distinct,
5682 }
5683 }
5684 _ => return Ok(None),
5685 };
5686
5687 Ok(Some(agg_call))
5688}
5689
5690fn parse_count_nulls_case(expr: &SqlExpr) -> SqlResult<Option<String>> {
5691 let SqlExpr::Case {
5692 operand,
5693 conditions,
5694 else_result,
5695 ..
5696 } = expr
5697 else {
5698 return Ok(None);
5699 };
5700
5701 if operand.is_some() || conditions.len() != 1 {
5702 return Ok(None);
5703 }
5704
5705 let case_when = &conditions[0];
5706 if !is_integer_literal(&case_when.result, 1) {
5707 return Ok(None);
5708 }
5709
5710 let else_expr = match else_result {
5711 Some(expr) => expr.as_ref(),
5712 None => return Ok(None),
5713 };
5714 if !is_integer_literal(else_expr, 0) {
5715 return Ok(None);
5716 }
5717
5718 let inner = match &case_when.condition {
5719 SqlExpr::IsNull(inner) => inner.as_ref(),
5720 _ => return Ok(None),
5721 };
5722
5723 resolve_column_name(inner).map(Some)
5724}
5725
5726fn is_integer_literal(expr: &SqlExpr, expected: i64) -> bool {
5727 match expr {
5728 SqlExpr::Value(ValueWithSpan {
5729 value: Value::Number(text, _),
5730 ..
5731 }) => text.parse::<i64>() == Ok(expected),
5732 _ => false,
5733 }
5734}
5735
5736fn strip_sql_expr_nesting(expr: &SqlExpr) -> &SqlExpr {
5737 match expr {
5738 SqlExpr::Nested(inner) => strip_sql_expr_nesting(inner),
5739 other => other,
5740 }
5741}
5742
5743struct BetweenBounds<'a> {
5744 lower: &'a SqlExpr,
5745 upper: &'a SqlExpr,
5746}
5747
5748fn translate_between_expr(
5749 resolver: &IdentifierResolver<'_>,
5750 context: IdentifierContext,
5751 between_expr: &SqlExpr,
5752 bounds: BetweenBounds<'_>,
5753 negated: bool,
5754 outer_scopes: &[IdentifierContext],
5755 mut correlated_tracker: Option<&mut SubqueryCorrelatedColumnTracker>,
5756) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
5757 let lower_op = if negated {
5758 BinaryOperator::Lt
5759 } else {
5760 BinaryOperator::GtEq
5761 };
5762 let upper_op = if negated {
5763 BinaryOperator::Gt
5764 } else {
5765 BinaryOperator::LtEq
5766 };
5767
5768 let lower_bound = translate_comparison_with_context(
5769 resolver,
5770 context.clone(),
5771 between_expr,
5772 lower_op,
5773 bounds.lower,
5774 outer_scopes,
5775 correlated_tracker.reborrow(),
5776 )?;
5777 let upper_bound = translate_comparison_with_context(
5778 resolver,
5779 context,
5780 between_expr,
5781 upper_op,
5782 bounds.upper,
5783 outer_scopes,
5784 correlated_tracker,
5785 )?;
5786
5787 if negated {
5788 Ok(llkv_expr::expr::Expr::Or(vec![lower_bound, upper_bound]))
5789 } else {
5790 Ok(llkv_expr::expr::Expr::And(vec![lower_bound, upper_bound]))
5791 }
5792}
5793
5794fn correlated_scalar_from_resolution(
5795 placeholder: String,
5796 resolution: &ColumnResolution,
5797) -> llkv_expr::expr::ScalarExpr<String> {
5798 let mut expr = llkv_expr::expr::ScalarExpr::column(placeholder);
5799 for field in resolution.field_path() {
5800 expr = llkv_expr::expr::ScalarExpr::get_field(expr, field.clone());
5801 }
5802 expr
5803}
5804
5805fn resolve_correlated_identifier(
5806 resolver: &IdentifierResolver<'_>,
5807 parts: &[String],
5808 outer_scopes: &[IdentifierContext],
5809 mut tracker: SubqueryCorrelatedTracker<'_>,
5810) -> SqlResult<Option<llkv_expr::expr::ScalarExpr<String>>> {
5811 if !tracker.is_active() {
5812 return Ok(None);
5813 }
5814
5815 for scope in outer_scopes.iter().rev() {
5816 match resolver.resolve(parts, scope.clone()) {
5817 Ok(resolution) => {
5818 if let Some(placeholder) = tracker.placeholder_for_resolution(&resolution) {
5819 let expr = correlated_scalar_from_resolution(placeholder, &resolution);
5820 return Ok(Some(expr));
5821 }
5822 }
5823 Err(_) => continue,
5824 }
5825 }
5826
5827 Ok(None)
5828}
5829
5830fn resolve_identifier_expr(
5831 resolver: &IdentifierResolver<'_>,
5832 context: &IdentifierContext,
5833 parts: Vec<String>,
5834 outer_scopes: &[IdentifierContext],
5835 tracker: SubqueryCorrelatedTracker<'_>,
5836) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
5837 match resolver.resolve(&parts, context.clone()) {
5838 Ok(resolution) => Ok(resolution.into_scalar_expr()),
5839 Err(err) => {
5840 if let Some(expr) =
5841 resolve_correlated_identifier(resolver, &parts, outer_scopes, tracker)?
5842 {
5843 Ok(expr)
5844 } else {
5845 Err(err)
5846 }
5847 }
5848 }
5849}
5850
5851fn translate_condition_with_context<P>(
5852 engine: &SqlEngine<P>,
5853 resolver: &IdentifierResolver<'_>,
5854 context: IdentifierContext,
5855 expr: &SqlExpr,
5856 outer_scopes: &[IdentifierContext],
5857 subqueries: &mut Vec<llkv_plan::FilterSubquery>,
5858 mut correlated_tracker: Option<&mut SubqueryCorrelatedColumnTracker>,
5859) -> SqlResult<llkv_expr::expr::Expr<'static, String>>
5860where
5861 P: Pager<Blob = EntryHandle> + Send + Sync,
5862{
5863 enum ConditionExitContext {
5870 And,
5871 Or,
5872 Not,
5873 Nested,
5874 }
5875
5876 type ConditionFrame<'a> = llkv_plan::TransformFrame<
5877 'a,
5878 SqlExpr,
5879 llkv_expr::expr::Expr<'static, String>,
5880 ConditionExitContext,
5881 >;
5882
5883 let mut work_stack: Vec<ConditionFrame> = vec![ConditionFrame::Enter(expr)];
5884 let mut result_stack: Vec<llkv_expr::expr::Expr<'static, String>> = Vec::new();
5885
5886 while let Some(frame) = work_stack.pop() {
5887 match frame {
5888 ConditionFrame::Enter(node) => match node {
5889 SqlExpr::BinaryOp { left, op, right } => match op {
5890 BinaryOperator::And => {
5891 work_stack.push(ConditionFrame::Exit(ConditionExitContext::And));
5892 work_stack.push(ConditionFrame::Enter(right));
5893 work_stack.push(ConditionFrame::Enter(left));
5894 }
5895 BinaryOperator::Or => {
5896 work_stack.push(ConditionFrame::Exit(ConditionExitContext::Or));
5897 work_stack.push(ConditionFrame::Enter(right));
5898 work_stack.push(ConditionFrame::Enter(left));
5899 }
5900 BinaryOperator::Eq
5901 | BinaryOperator::NotEq
5902 | BinaryOperator::Lt
5903 | BinaryOperator::LtEq
5904 | BinaryOperator::Gt
5905 | BinaryOperator::GtEq => {
5906 let result = translate_comparison_with_context(
5907 resolver,
5908 context.clone(),
5909 left,
5910 op.clone(),
5911 right,
5912 outer_scopes,
5913 correlated_tracker.reborrow(),
5914 )?;
5915 work_stack.push(ConditionFrame::Leaf(result));
5916 }
5917 other => {
5918 return Err(Error::InvalidArgumentError(format!(
5919 "unsupported binary operator in WHERE clause: {other:?}"
5920 )));
5921 }
5922 },
5923 SqlExpr::UnaryOp {
5924 op: UnaryOperator::Not,
5925 expr: inner,
5926 } => {
5927 let inner_stripped = strip_sql_expr_nesting(inner);
5928 if let SqlExpr::Between {
5929 expr: between_expr,
5930 negated: inner_negated,
5931 low,
5932 high,
5933 } = inner_stripped
5934 {
5935 let negated_mode = !*inner_negated;
5936 let between_expr_result = translate_between_expr(
5937 resolver,
5938 context.clone(),
5939 between_expr,
5940 BetweenBounds {
5941 lower: low,
5942 upper: high,
5943 },
5944 negated_mode,
5945 outer_scopes,
5946 correlated_tracker.reborrow(),
5947 )?;
5948 work_stack.push(ConditionFrame::Leaf(between_expr_result));
5949 continue;
5950 }
5951 work_stack.push(ConditionFrame::Exit(ConditionExitContext::Not));
5955 work_stack.push(ConditionFrame::Enter(inner));
5956 }
5957 SqlExpr::Nested(inner) => {
5958 work_stack.push(ConditionFrame::Exit(ConditionExitContext::Nested));
5959 work_stack.push(ConditionFrame::Enter(inner));
5960 }
5961 SqlExpr::IsNull(inner) => {
5962 let scalar = translate_scalar_with_context_scoped(
5963 resolver,
5964 context.clone(),
5965 inner,
5966 outer_scopes,
5967 correlated_tracker.reborrow(),
5968 )?;
5969 match scalar {
5970 llkv_expr::expr::ScalarExpr::Column(column) => {
5971 work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::Pred(
5973 llkv_expr::expr::Filter {
5974 field_id: column,
5975 op: llkv_expr::expr::Operator::IsNull,
5976 },
5977 )));
5978 }
5979 other => {
5984 work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::IsNull {
5986 expr: other,
5987 negated: false,
5988 }));
5989 }
5990 }
5991 }
5992 SqlExpr::IsNotNull(inner) => {
5993 let scalar = translate_scalar_with_context_scoped(
5994 resolver,
5995 context.clone(),
5996 inner,
5997 outer_scopes,
5998 correlated_tracker.reborrow(),
5999 )?;
6000 match scalar {
6001 llkv_expr::expr::ScalarExpr::Column(column) => {
6002 work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::Pred(
6004 llkv_expr::expr::Filter {
6005 field_id: column,
6006 op: llkv_expr::expr::Operator::IsNotNull,
6007 },
6008 )));
6009 }
6010 other => {
6015 work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::IsNull {
6017 expr: other,
6018 negated: true,
6019 }));
6020 }
6021 }
6022 }
6023 SqlExpr::InList {
6024 expr: in_expr,
6025 list,
6026 negated,
6027 } => {
6028 if list.is_empty() {
6029 let result = if *negated {
6030 llkv_expr::expr::Expr::Literal(true)
6031 } else {
6032 llkv_expr::expr::Expr::Literal(false)
6033 };
6034 work_stack.push(ConditionFrame::Leaf(result));
6035 } else {
6036 let target = translate_scalar_with_context_scoped(
6037 resolver,
6038 context.clone(),
6039 in_expr,
6040 outer_scopes,
6041 correlated_tracker.reborrow(),
6042 )?;
6043 let mut values = Vec::with_capacity(list.len());
6044 for value_expr in list {
6045 let scalar = translate_scalar_with_context_scoped(
6046 resolver,
6047 context.clone(),
6048 value_expr,
6049 outer_scopes,
6050 correlated_tracker.reborrow(),
6051 )?;
6052 values.push(scalar);
6053 }
6054
6055 work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::InList {
6056 expr: target,
6057 list: values,
6058 negated: *negated,
6059 }));
6060 }
6061 }
6062 SqlExpr::InSubquery { .. } => {
6063 return Err(Error::InvalidArgumentError(
6064 "IN (SELECT ...) subqueries must be materialized before translation".into(),
6065 ));
6066 }
6067 SqlExpr::Between {
6068 expr: between_expr,
6069 negated,
6070 low,
6071 high,
6072 } => {
6073 let between_expr_result = translate_between_expr(
6074 resolver,
6075 context.clone(),
6076 between_expr,
6077 BetweenBounds {
6078 lower: low,
6079 upper: high,
6080 },
6081 *negated,
6082 outer_scopes,
6083 correlated_tracker.reborrow(),
6084 )?;
6085 work_stack.push(ConditionFrame::Leaf(between_expr_result));
6086 }
6087 SqlExpr::Exists { subquery, negated } => {
6088 let mut nested_scopes = outer_scopes.to_vec();
6090 nested_scopes.push(context.clone());
6091
6092 let mut tracker = SubqueryCorrelatedColumnTracker::new();
6093 let mut nested_subqueries = Vec::new();
6094
6095 let subquery_plan = engine.build_select_plan_internal(
6097 (**subquery).clone(),
6098 resolver,
6099 &nested_scopes,
6100 &mut nested_subqueries,
6101 Some(&mut tracker),
6102 )?;
6103
6104 let subquery_id = llkv_expr::SubqueryId(subqueries.len() as u32);
6105 let filter_subquery = llkv_plan::FilterSubquery {
6106 id: subquery_id,
6107 plan: Box::new(subquery_plan),
6108 correlated_columns: tracker.into_columns(),
6109 };
6110 subqueries.push(filter_subquery);
6111
6112 work_stack.push(ConditionFrame::Leaf(llkv_expr::expr::Expr::Exists(
6113 llkv_expr::SubqueryExpr {
6114 id: subquery_id,
6115 negated: *negated,
6116 },
6117 )));
6118 }
6119 other => {
6120 return Err(Error::InvalidArgumentError(format!(
6121 "unsupported WHERE clause: {other:?}"
6122 )));
6123 }
6124 },
6125 ConditionFrame::Leaf(translated) => {
6126 result_stack.push(translated);
6127 }
6128 ConditionFrame::Exit(exit_context) => match exit_context {
6129 ConditionExitContext::And => {
6130 let right = result_stack.pop().ok_or_else(|| {
6131 Error::Internal(
6132 "translate_condition: result stack underflow for And right".into(),
6133 )
6134 })?;
6135 let left = result_stack.pop().ok_or_else(|| {
6136 Error::Internal(
6137 "translate_condition: result stack underflow for And left".into(),
6138 )
6139 })?;
6140 result_stack.push(flatten_and(left, right));
6141 }
6142 ConditionExitContext::Or => {
6143 let right = result_stack.pop().ok_or_else(|| {
6144 Error::Internal(
6145 "translate_condition: result stack underflow for Or right".into(),
6146 )
6147 })?;
6148 let left = result_stack.pop().ok_or_else(|| {
6149 Error::Internal(
6150 "translate_condition: result stack underflow for Or left".into(),
6151 )
6152 })?;
6153 result_stack.push(flatten_or(left, right));
6154 }
6155 ConditionExitContext::Not => {
6156 let inner = result_stack.pop().ok_or_else(|| {
6157 Error::Internal(
6158 "translate_condition: result stack underflow for Not".into(),
6159 )
6160 })?;
6161 match inner {
6163 llkv_expr::expr::Expr::IsNull { expr, negated } => {
6164 result_stack.push(llkv_expr::expr::Expr::IsNull {
6165 expr,
6166 negated: !negated,
6167 });
6168 }
6169 other => {
6170 result_stack.push(llkv_expr::expr::Expr::not(other));
6171 }
6172 }
6173 }
6174 ConditionExitContext::Nested => {
6175 }
6177 },
6178 }
6179 }
6180
6181 result_stack.pop().ok_or_else(|| {
6182 Error::Internal("translate_condition_with_context: empty result stack".into())
6183 })
6184}
6185
6186fn flatten_and(
6187 left: llkv_expr::expr::Expr<'static, String>,
6188 right: llkv_expr::expr::Expr<'static, String>,
6189) -> llkv_expr::expr::Expr<'static, String> {
6190 let mut children: Vec<llkv_expr::expr::Expr<'static, String>> = Vec::new();
6191 match left {
6192 llkv_expr::expr::Expr::And(mut left_children) => children.append(&mut left_children),
6193 other => children.push(other),
6194 }
6195 match right {
6196 llkv_expr::expr::Expr::And(mut right_children) => children.append(&mut right_children),
6197 other => children.push(other),
6198 }
6199 if children.len() == 1 {
6200 children.into_iter().next().unwrap()
6201 } else {
6202 llkv_expr::expr::Expr::And(children)
6203 }
6204}
6205
6206fn flatten_or(
6207 left: llkv_expr::expr::Expr<'static, String>,
6208 right: llkv_expr::expr::Expr<'static, String>,
6209) -> llkv_expr::expr::Expr<'static, String> {
6210 let mut children: Vec<llkv_expr::expr::Expr<'static, String>> = Vec::new();
6211 match left {
6212 llkv_expr::expr::Expr::Or(mut left_children) => children.append(&mut left_children),
6213 other => children.push(other),
6214 }
6215 match right {
6216 llkv_expr::expr::Expr::Or(mut right_children) => children.append(&mut right_children),
6217 other => children.push(other),
6218 }
6219 if children.len() == 1 {
6220 children.into_iter().next().unwrap()
6221 } else {
6222 llkv_expr::expr::Expr::Or(children)
6223 }
6224}
6225
6226fn peel_unparenthesized_not_chain(expr: &SqlExpr) -> (usize, &SqlExpr) {
6227 let mut count: usize = 0;
6228 let mut current = expr;
6229 while let SqlExpr::UnaryOp {
6230 op: UnaryOperator::Not,
6231 expr: inner,
6232 } = current
6233 {
6234 if matches!(inner.as_ref(), SqlExpr::Nested(_)) {
6235 break;
6236 }
6237 count += 1;
6238 current = inner.as_ref();
6239 }
6240 (count, current)
6241}
6242
6243fn translate_comparison_with_context(
6244 resolver: &IdentifierResolver<'_>,
6245 context: IdentifierContext,
6246 left: &SqlExpr,
6247 op: BinaryOperator,
6248 right: &SqlExpr,
6249 outer_scopes: &[IdentifierContext],
6250 mut correlated_tracker: Option<&mut SubqueryCorrelatedColumnTracker>,
6251) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
6252 let (not_count, comparison_left) = peel_unparenthesized_not_chain(left);
6253
6254 let left_scalar = {
6255 let tracker = correlated_tracker.reborrow();
6256 translate_scalar_with_context_scoped(
6257 resolver,
6258 context.clone(),
6259 comparison_left,
6260 outer_scopes,
6261 tracker,
6262 )?
6263 };
6264 let right_scalar = {
6265 let tracker = correlated_tracker.reborrow();
6266 translate_scalar_with_context_scoped(resolver, context, right, outer_scopes, tracker)?
6267 };
6268 let compare_op = match op {
6269 BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
6270 BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
6271 BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
6272 BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
6273 BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
6274 BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
6275 other => {
6276 return Err(Error::InvalidArgumentError(format!(
6277 "unsupported comparison operator: {other:?}"
6278 )));
6279 }
6280 };
6281
6282 let mut expr = llkv_expr::expr::Expr::Compare {
6283 left: left_scalar.clone(),
6284 op: compare_op,
6285 right: right_scalar.clone(),
6286 };
6287
6288 if let (
6289 llkv_expr::expr::ScalarExpr::Column(column),
6290 llkv_expr::expr::ScalarExpr::Literal(literal),
6291 ) = (&left_scalar, &right_scalar)
6292 && let Some(op) = compare_op_to_filter_operator(compare_op, literal)
6293 {
6294 tracing::debug!(
6295 column = ?column,
6296 literal = ?literal,
6297 ?compare_op,
6298 "translate_comparison direct"
6299 );
6300 expr = llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
6301 field_id: column.clone(),
6302 op,
6303 });
6304 } else if let (
6305 llkv_expr::expr::ScalarExpr::Literal(literal),
6306 llkv_expr::expr::ScalarExpr::Column(column),
6307 ) = (&left_scalar, &right_scalar)
6308 && let Some(flipped) = flip_compare_op(compare_op)
6309 && let Some(op) = compare_op_to_filter_operator(flipped, literal)
6310 {
6311 tracing::debug!(
6312 column = ?column,
6313 literal = ?literal,
6314 original_op = ?compare_op,
6315 flipped_op = ?flipped,
6316 "translate_comparison flipped"
6317 );
6318 expr = llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
6319 field_id: column.clone(),
6320 op,
6321 });
6322 }
6323
6324 let mut wrapped = expr;
6325 for _ in 0..not_count {
6326 wrapped = llkv_expr::expr::Expr::Not(Box::new(wrapped));
6327 }
6328
6329 Ok(wrapped)
6330}
6331
6332fn compare_op_to_filter_operator(
6333 op: llkv_expr::expr::CompareOp,
6334 literal: &Literal,
6335) -> Option<llkv_expr::expr::Operator<'static>> {
6336 if matches!(literal, Literal::Null) {
6337 return None;
6338 }
6339 let lit = literal.clone();
6340 tracing::debug!(?op, literal = ?literal, "compare_op_to_filter_operator input");
6341 match op {
6342 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::Operator::Equals(lit)),
6343 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::Operator::LessThan(lit)),
6344 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::Operator::LessThanOrEquals(lit)),
6345 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::Operator::GreaterThan(lit)),
6346 llkv_expr::expr::CompareOp::GtEq => {
6347 Some(llkv_expr::expr::Operator::GreaterThanOrEquals(lit))
6348 }
6349 llkv_expr::expr::CompareOp::NotEq => None,
6350 }
6351}
6352
6353fn flip_compare_op(op: llkv_expr::expr::CompareOp) -> Option<llkv_expr::expr::CompareOp> {
6354 match op {
6355 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::CompareOp::Eq),
6356 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::CompareOp::Gt),
6357 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::CompareOp::GtEq),
6358 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::CompareOp::Lt),
6359 llkv_expr::expr::CompareOp::GtEq => Some(llkv_expr::expr::CompareOp::LtEq),
6360 llkv_expr::expr::CompareOp::NotEq => None,
6361 }
6362}
6363fn translate_scalar_with_context(
6366 resolver: &IdentifierResolver<'_>,
6367 context: IdentifierContext,
6368 expr: &SqlExpr,
6369) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
6370 let mut tracker = SubqueryCorrelatedTracker::from_option(None);
6371 translate_scalar_internal(
6372 expr,
6373 Some(resolver),
6374 Some(&context),
6375 &[],
6376 &mut tracker,
6377 None,
6378 )
6379}
6380
6381fn translate_scalar_with_context_scoped(
6382 resolver: &IdentifierResolver<'_>,
6383 context: IdentifierContext,
6384 expr: &SqlExpr,
6385 outer_scopes: &[IdentifierContext],
6386 correlated_tracker: Option<&mut SubqueryCorrelatedColumnTracker>,
6387) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
6388 let mut tracker = SubqueryCorrelatedTracker::from_option(correlated_tracker);
6389 translate_scalar_internal(
6390 expr,
6391 Some(resolver),
6392 Some(&context),
6393 outer_scopes,
6394 &mut tracker,
6395 None,
6396 )
6397}
6398
6399#[allow(dead_code)]
6400fn translate_scalar(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
6401 let mut tracker = SubqueryCorrelatedTracker::from_option(None);
6402 translate_scalar_internal(expr, None, None, &[], &mut tracker, None)
6403}
6404
6405fn translate_scalar_internal(
6406 expr: &SqlExpr,
6407 resolver: Option<&IdentifierResolver<'_>>,
6408 context: Option<&IdentifierContext>,
6409 outer_scopes: &[IdentifierContext],
6410 tracker: &mut SubqueryCorrelatedTracker<'_>,
6411 mut subquery_resolver: Option<&mut dyn ScalarSubqueryResolver>,
6412) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
6413 enum ScalarExitContext {
6421 BinaryOp {
6422 op: BinaryOperator,
6423 },
6424 Compare {
6425 op: llkv_expr::expr::CompareOp,
6426 },
6427 UnaryNot,
6428 UnaryMinus,
6429 UnaryPlus,
6430 Nested,
6431 Cast(DataType),
6432 IsNull {
6433 negated: bool,
6434 },
6435 Between {
6436 negated: bool,
6437 },
6438 InList {
6439 list_len: usize,
6440 negated: bool,
6441 },
6442 Case {
6443 branch_count: usize,
6444 has_operand: bool,
6445 has_else: bool,
6446 },
6447 BuiltinFunction {
6448 func: BuiltinScalarFunction,
6449 arg_count: usize,
6450 },
6451 }
6452
6453 #[derive(Clone, Copy)]
6454 enum BuiltinScalarFunction {
6455 Abs,
6456 Coalesce,
6457 NullIf,
6458 }
6459
6460 type ScalarFrame<'a> =
6461 TransformFrame<'a, SqlExpr, llkv_expr::expr::ScalarExpr<String>, ScalarExitContext>;
6462
6463 let mut work_stack: Vec<ScalarFrame> = vec![ScalarFrame::Enter(expr)];
6464 let mut result_stack: Vec<llkv_expr::expr::ScalarExpr<String>> = Vec::new();
6465
6466 while let Some(frame) = work_stack.pop() {
6467 match frame {
6468 ScalarFrame::Enter(node) => match node {
6469 SqlExpr::Identifier(ident) => {
6470 if let (Some(resolver), Some(ctx)) = (resolver, context) {
6471 let parts = vec![ident.value.clone()];
6472 let tracker_view = tracker.reborrow();
6473 let expr = resolve_identifier_expr(
6474 resolver,
6475 ctx,
6476 parts,
6477 outer_scopes,
6478 tracker_view,
6479 )?;
6480 work_stack.push(ScalarFrame::Leaf(expr));
6481 } else {
6482 work_stack.push(ScalarFrame::Leaf(llkv_expr::expr::ScalarExpr::column(
6483 ident.value.clone(),
6484 )));
6485 }
6486 }
6487 SqlExpr::CompoundIdentifier(idents) => {
6488 if idents.is_empty() {
6489 return Err(Error::InvalidArgumentError(
6490 "invalid compound identifier".into(),
6491 ));
6492 }
6493
6494 if let (Some(resolver), Some(ctx)) = (resolver, context) {
6495 let parts: Vec<String> =
6496 idents.iter().map(|ident| ident.value.clone()).collect();
6497 let tracker_view = tracker.reborrow();
6498 let expr = resolve_identifier_expr(
6499 resolver,
6500 ctx,
6501 parts,
6502 outer_scopes,
6503 tracker_view,
6504 )?;
6505 work_stack.push(ScalarFrame::Leaf(expr));
6506 } else {
6507 let column_name = idents[0].value.clone();
6508 let mut result = llkv_expr::expr::ScalarExpr::column(column_name);
6509
6510 for part in &idents[1..] {
6511 let field_name = part.value.clone();
6512 result = llkv_expr::expr::ScalarExpr::get_field(result, field_name);
6513 }
6514
6515 work_stack.push(ScalarFrame::Leaf(result));
6516 }
6517 }
6518 SqlExpr::Value(value) => {
6519 let result = literal_from_value(value)?;
6520 work_stack.push(ScalarFrame::Leaf(result));
6521 }
6522 SqlExpr::BinaryOp { left, op, right } => match op {
6523 BinaryOperator::Plus
6524 | BinaryOperator::Minus
6525 | BinaryOperator::Multiply
6526 | BinaryOperator::Divide
6527 | BinaryOperator::Modulo => {
6528 work_stack.push(ScalarFrame::Exit(ScalarExitContext::BinaryOp {
6529 op: op.clone(),
6530 }));
6531 work_stack.push(ScalarFrame::Enter(right));
6532 work_stack.push(ScalarFrame::Enter(left));
6533 }
6534 BinaryOperator::Eq
6535 | BinaryOperator::NotEq
6536 | BinaryOperator::Lt
6537 | BinaryOperator::LtEq
6538 | BinaryOperator::Gt
6539 | BinaryOperator::GtEq => {
6540 let compare_op = match op {
6541 BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
6542 BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
6543 BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
6544 BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
6545 BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
6546 BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
6547 _ => unreachable!(),
6548 };
6549 work_stack.push(ScalarFrame::Exit(ScalarExitContext::Compare {
6550 op: compare_op,
6551 }));
6552 work_stack.push(ScalarFrame::Enter(right));
6553 work_stack.push(ScalarFrame::Enter(left));
6554 }
6555 other => {
6556 return Err(Error::InvalidArgumentError(format!(
6557 "unsupported scalar binary operator: {other:?}"
6558 )));
6559 }
6560 },
6561 SqlExpr::UnaryOp {
6562 op: UnaryOperator::Not,
6563 expr: inner,
6564 } => {
6565 work_stack.push(ScalarFrame::Exit(ScalarExitContext::UnaryNot));
6566 work_stack.push(ScalarFrame::Enter(inner));
6567 }
6568 SqlExpr::UnaryOp {
6569 op: UnaryOperator::Minus,
6570 expr: inner,
6571 } => {
6572 work_stack.push(ScalarFrame::Exit(ScalarExitContext::UnaryMinus));
6573 work_stack.push(ScalarFrame::Enter(inner));
6574 }
6575 SqlExpr::UnaryOp {
6576 op: UnaryOperator::Plus,
6577 expr: inner,
6578 } => {
6579 work_stack.push(ScalarFrame::Exit(ScalarExitContext::UnaryPlus));
6580 work_stack.push(ScalarFrame::Enter(inner));
6581 }
6582 SqlExpr::Nested(inner) => {
6583 work_stack.push(ScalarFrame::Exit(ScalarExitContext::Nested));
6584 work_stack.push(ScalarFrame::Enter(inner));
6585 }
6586 SqlExpr::Cast {
6587 expr: inner,
6588 data_type,
6589 ..
6590 } => {
6591 let target_type = arrow_type_from_sql(data_type)?;
6592 work_stack.push(ScalarFrame::Exit(ScalarExitContext::Cast(target_type)));
6593 work_stack.push(ScalarFrame::Enter(inner));
6594 }
6595 SqlExpr::Case {
6596 operand,
6597 conditions,
6598 else_result,
6599 ..
6600 } => {
6601 work_stack.push(ScalarFrame::Exit(ScalarExitContext::Case {
6602 branch_count: conditions.len(),
6603 has_operand: operand.is_some(),
6604 has_else: else_result.is_some(),
6605 }));
6606 if let Some(else_expr) = else_result.as_deref() {
6607 work_stack.push(ScalarFrame::Enter(else_expr));
6608 }
6609 for case_when in conditions.iter().rev() {
6610 work_stack.push(ScalarFrame::Enter(&case_when.result));
6611 work_stack.push(ScalarFrame::Enter(&case_when.condition));
6612 }
6613 if let Some(opnd) = operand.as_deref() {
6614 work_stack.push(ScalarFrame::Enter(opnd));
6615 }
6616 }
6617 SqlExpr::InList {
6618 expr: in_expr,
6619 list,
6620 negated,
6621 } => {
6622 if list.is_empty() {
6623 let literal_value = if *negated {
6624 llkv_expr::expr::ScalarExpr::literal(Literal::Integer(1))
6625 } else {
6626 llkv_expr::expr::ScalarExpr::literal(Literal::Integer(0))
6627 };
6628 work_stack.push(ScalarFrame::Leaf(literal_value));
6629 } else {
6630 work_stack.push(ScalarFrame::Exit(ScalarExitContext::InList {
6631 list_len: list.len(),
6632 negated: *negated,
6633 }));
6634 for value_expr in list.iter().rev() {
6635 work_stack.push(ScalarFrame::Enter(value_expr));
6636 }
6637 work_stack.push(ScalarFrame::Enter(in_expr));
6638 }
6639 }
6640 SqlExpr::IsNull(inner) => {
6641 work_stack.push(ScalarFrame::Exit(ScalarExitContext::IsNull {
6642 negated: false,
6643 }));
6644 work_stack.push(ScalarFrame::Enter(inner));
6645 }
6646 SqlExpr::IsNotNull(inner) => {
6647 work_stack.push(ScalarFrame::Exit(ScalarExitContext::IsNull {
6648 negated: true,
6649 }));
6650 work_stack.push(ScalarFrame::Enter(inner));
6651 }
6652 SqlExpr::Between {
6653 expr: between_expr,
6654 negated,
6655 low,
6656 high,
6657 } => {
6658 work_stack.push(ScalarFrame::Exit(ScalarExitContext::Between {
6659 negated: *negated,
6660 }));
6661 work_stack.push(ScalarFrame::Enter(high));
6662 work_stack.push(ScalarFrame::Enter(low));
6663 work_stack.push(ScalarFrame::Enter(between_expr));
6664 }
6665 SqlExpr::Function(func) => {
6666 if let Some(agg_call) = try_parse_aggregate_function(
6667 func,
6668 resolver,
6669 context,
6670 outer_scopes,
6671 tracker,
6672 )? {
6673 work_stack.push(ScalarFrame::Leaf(llkv_expr::expr::ScalarExpr::aggregate(
6674 agg_call,
6675 )));
6676 } else {
6677 use sqlparser::ast::{
6678 FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart,
6679 };
6680
6681 if func.uses_odbc_syntax
6682 || !matches!(func.parameters, FunctionArguments::None)
6683 || func.filter.is_some()
6684 || func.null_treatment.is_some()
6685 || func.over.is_some()
6686 || !func.within_group.is_empty()
6687 {
6688 return Err(Error::InvalidArgumentError(format!(
6689 "unsupported function in scalar expression: {:?}",
6690 func.name
6691 )));
6692 }
6693
6694 let func_name = if func.name.0.len() == 1 {
6695 match &func.name.0[0] {
6696 ObjectNamePart::Identifier(ident) => {
6697 ident.value.to_ascii_lowercase()
6698 }
6699 _ => {
6700 return Err(Error::InvalidArgumentError(format!(
6701 "unsupported function in scalar expression: {:?}",
6702 func.name
6703 )));
6704 }
6705 }
6706 } else {
6707 return Err(Error::InvalidArgumentError(format!(
6708 "unsupported function in scalar expression: {:?}",
6709 func.name
6710 )));
6711 };
6712
6713 match func_name.as_str() {
6714 "abs" => {
6715 let args_slice: &[FunctionArg] = match &func.args {
6716 FunctionArguments::List(list) => {
6717 if list.duplicate_treatment.is_some()
6718 || !list.clauses.is_empty()
6719 {
6720 return Err(Error::InvalidArgumentError(
6721 "ABS does not support qualifiers".into(),
6722 ));
6723 }
6724 &list.args
6725 }
6726 _ => {
6727 return Err(Error::InvalidArgumentError(
6728 "ABS requires exactly one argument".into(),
6729 ));
6730 }
6731 };
6732
6733 if args_slice.len() != 1 {
6734 return Err(Error::InvalidArgumentError(
6735 "ABS requires exactly one argument".into(),
6736 ));
6737 }
6738
6739 let arg_expr = match &args_slice[0] {
6740 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
6741 _ => {
6742 return Err(Error::InvalidArgumentError(
6743 "ABS argument must be an expression".into(),
6744 ));
6745 }
6746 };
6747
6748 work_stack.push(ScalarFrame::Exit(
6749 ScalarExitContext::BuiltinFunction {
6750 func: BuiltinScalarFunction::Abs,
6751 arg_count: 1,
6752 },
6753 ));
6754 work_stack.push(ScalarFrame::Enter(arg_expr));
6755 continue;
6756 }
6757 "coalesce" => {
6758 let args_slice: &[FunctionArg] = match &func.args {
6759 FunctionArguments::List(list) => {
6760 if list.duplicate_treatment.is_some()
6761 || !list.clauses.is_empty()
6762 {
6763 return Err(Error::InvalidArgumentError(
6764 "COALESCE does not support qualifiers".into(),
6765 ));
6766 }
6767 &list.args
6768 }
6769 _ => {
6770 return Err(Error::InvalidArgumentError(
6771 "COALESCE requires at least one argument".into(),
6772 ));
6773 }
6774 };
6775
6776 if args_slice.is_empty() {
6777 return Err(Error::InvalidArgumentError(
6778 "COALESCE requires at least one argument".into(),
6779 ));
6780 }
6781
6782 work_stack.push(ScalarFrame::Exit(
6783 ScalarExitContext::BuiltinFunction {
6784 func: BuiltinScalarFunction::Coalesce,
6785 arg_count: args_slice.len(),
6786 },
6787 ));
6788
6789 for arg in args_slice.iter().rev() {
6790 let arg_expr = match arg {
6791 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
6792 _ => {
6793 return Err(Error::InvalidArgumentError(
6794 "COALESCE arguments must be expressions".into(),
6795 ));
6796 }
6797 };
6798 work_stack.push(ScalarFrame::Enter(arg_expr));
6799 }
6800 continue;
6801 }
6802 "nullif" => {
6803 let args_slice: &[FunctionArg] = match &func.args {
6804 FunctionArguments::List(list) => {
6805 if list.duplicate_treatment.is_some()
6806 || !list.clauses.is_empty()
6807 {
6808 return Err(Error::InvalidArgumentError(
6809 "NULLIF does not support qualifiers".into(),
6810 ));
6811 }
6812 &list.args
6813 }
6814 _ => {
6815 return Err(Error::InvalidArgumentError(
6816 "NULLIF requires exactly two arguments".into(),
6817 ));
6818 }
6819 };
6820
6821 if args_slice.len() != 2 {
6822 return Err(Error::InvalidArgumentError(
6823 "NULLIF requires exactly two arguments".into(),
6824 ));
6825 }
6826
6827 work_stack.push(ScalarFrame::Exit(
6828 ScalarExitContext::BuiltinFunction {
6829 func: BuiltinScalarFunction::NullIf,
6830 arg_count: 2,
6831 },
6832 ));
6833
6834 for arg in args_slice.iter().rev() {
6835 let arg_expr = match arg {
6836 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
6837 _ => {
6838 return Err(Error::InvalidArgumentError(
6839 "NULLIF arguments must be expressions".into(),
6840 ));
6841 }
6842 };
6843 work_stack.push(ScalarFrame::Enter(arg_expr));
6844 }
6845 continue;
6846 }
6847 _ => {
6848 return Err(Error::InvalidArgumentError(format!(
6849 "unsupported function in scalar expression: {:?}",
6850 func.name
6851 )));
6852 }
6853 }
6854 }
6855 }
6856 SqlExpr::Dictionary(fields) => {
6857 let mut struct_fields = Vec::new();
6859 for entry in fields {
6860 let key = entry.key.value.clone();
6861 let mut tracker_view = tracker.reborrow();
6864 let value_expr = translate_scalar_internal(
6865 &entry.value,
6866 resolver,
6867 context,
6868 outer_scopes,
6869 &mut tracker_view,
6870 None,
6871 )?;
6872 match value_expr {
6873 llkv_expr::expr::ScalarExpr::Literal(lit) => {
6874 struct_fields.push((key, Box::new(lit)));
6875 }
6876 _ => {
6877 return Err(Error::InvalidArgumentError(
6878 "Dictionary values must be literals".to_string(),
6879 ));
6880 }
6881 }
6882 }
6883 work_stack.push(ScalarFrame::Leaf(llkv_expr::expr::ScalarExpr::literal(
6884 Literal::Struct(struct_fields),
6885 )));
6886 }
6887 SqlExpr::Subquery(subquery) => {
6888 let handler = subquery_resolver.as_mut().ok_or_else(|| {
6889 Error::InvalidArgumentError(
6890 "Correlated scalar subqueries not yet fully implemented - requires plan-level support".
6891 to_string(),
6892 )
6893 })?;
6894 let resolver_ref = resolver.ok_or_else(|| {
6895 Error::InvalidArgumentError(
6896 "scalar subquery translation requires identifier resolver".into(),
6897 )
6898 })?;
6899 let context_ref = context.ok_or_else(|| {
6900 Error::InvalidArgumentError(
6901 "scalar subquery translation requires identifier context".into(),
6902 )
6903 })?;
6904 let translated = handler.handle_scalar_subquery(
6905 subquery.as_ref(),
6906 resolver_ref,
6907 context_ref,
6908 outer_scopes,
6909 )?;
6910 work_stack.push(ScalarFrame::Leaf(translated));
6911 }
6912 other => {
6913 return Err(Error::InvalidArgumentError(format!(
6914 "unsupported scalar expression: {other:?}"
6915 )));
6916 }
6917 },
6918 ScalarFrame::Leaf(translated) => {
6919 result_stack.push(translated);
6920 }
6921 ScalarFrame::Exit(exit_context) => match exit_context {
6922 ScalarExitContext::BinaryOp { op } => {
6923 let right_expr = result_stack.pop().ok_or_else(|| {
6924 Error::Internal(
6925 "translate_scalar: result stack underflow for BinaryOp right".into(),
6926 )
6927 })?;
6928 let left_expr = result_stack.pop().ok_or_else(|| {
6929 Error::Internal(
6930 "translate_scalar: result stack underflow for BinaryOp left".into(),
6931 )
6932 })?;
6933 let binary_op = match op {
6934 BinaryOperator::Plus => llkv_expr::expr::BinaryOp::Add,
6935 BinaryOperator::Minus => llkv_expr::expr::BinaryOp::Subtract,
6936 BinaryOperator::Multiply => llkv_expr::expr::BinaryOp::Multiply,
6937 BinaryOperator::Divide => llkv_expr::expr::BinaryOp::Divide,
6938 BinaryOperator::Modulo => llkv_expr::expr::BinaryOp::Modulo,
6939 other => {
6940 return Err(Error::InvalidArgumentError(format!(
6941 "unsupported scalar binary operator: {other:?}"
6942 )));
6943 }
6944 };
6945 result_stack.push(llkv_expr::expr::ScalarExpr::binary(
6946 left_expr, binary_op, right_expr,
6947 ));
6948 }
6949 ScalarExitContext::Compare { op } => {
6950 let right_expr = result_stack.pop().ok_or_else(|| {
6951 Error::Internal(
6952 "translate_scalar: result stack underflow for Compare right".into(),
6953 )
6954 })?;
6955 let left_expr = result_stack.pop().ok_or_else(|| {
6956 Error::Internal(
6957 "translate_scalar: result stack underflow for Compare left".into(),
6958 )
6959 })?;
6960 result_stack.push(llkv_expr::expr::ScalarExpr::compare(
6961 left_expr, op, right_expr,
6962 ));
6963 }
6964 ScalarExitContext::BuiltinFunction { func, arg_count } => {
6965 if result_stack.len() < arg_count {
6966 return Err(Error::Internal(
6967 "translate_scalar: result stack underflow for builtin function".into(),
6968 ));
6969 }
6970
6971 let mut args: Vec<llkv_expr::expr::ScalarExpr<String>> =
6972 Vec::with_capacity(arg_count);
6973 for _ in 0..arg_count {
6974 if let Some(expr) = result_stack.pop() {
6975 args.push(expr);
6976 }
6977 }
6978 args.reverse();
6979
6980 let result_expr = match func {
6981 BuiltinScalarFunction::Abs => {
6982 debug_assert_eq!(args.len(), 1);
6983 build_abs_case_expr(args.pop().expect("ABS expects one argument"))
6984 }
6985 BuiltinScalarFunction::Coalesce => {
6986 llkv_expr::expr::ScalarExpr::coalesce(args)
6987 }
6988 BuiltinScalarFunction::NullIf => {
6989 debug_assert_eq!(args.len(), 2);
6990 let left = args.remove(0);
6991 let right = args.remove(0);
6992 let condition = llkv_expr::expr::ScalarExpr::compare(
6993 left.clone(),
6994 llkv_expr::expr::CompareOp::Eq,
6995 right,
6996 );
6997 llkv_expr::expr::ScalarExpr::Case {
6998 operand: None,
6999 branches: vec![(
7000 condition,
7001 llkv_expr::expr::ScalarExpr::literal(Literal::Null),
7002 )],
7003 else_expr: Some(Box::new(left)),
7004 }
7005 }
7006 };
7007
7008 result_stack.push(result_expr);
7009 }
7010 ScalarExitContext::UnaryMinus => {
7011 let inner = result_stack.pop().ok_or_else(|| {
7012 Error::Internal(
7013 "translate_scalar: result stack underflow for UnaryMinus".into(),
7014 )
7015 })?;
7016 match inner {
7017 llkv_expr::expr::ScalarExpr::Literal(lit) => match lit {
7018 Literal::Integer(v) => {
7019 result_stack.push(llkv_expr::expr::ScalarExpr::literal(
7020 Literal::Integer(-v),
7021 ));
7022 }
7023 Literal::Float(v) => {
7024 result_stack
7025 .push(llkv_expr::expr::ScalarExpr::literal(Literal::Float(-v)));
7026 }
7027 Literal::Boolean(_) => {
7028 return Err(Error::InvalidArgumentError(
7029 "cannot negate boolean literal".into(),
7030 ));
7031 }
7032 Literal::String(_) => {
7033 return Err(Error::InvalidArgumentError(
7034 "cannot negate string literal".into(),
7035 ));
7036 }
7037 Literal::Struct(_) => {
7038 return Err(Error::InvalidArgumentError(
7039 "cannot negate struct literal".into(),
7040 ));
7041 }
7042 Literal::Null => {
7043 result_stack
7044 .push(llkv_expr::expr::ScalarExpr::literal(Literal::Null));
7045 }
7046 },
7047 other => {
7048 let zero = llkv_expr::expr::ScalarExpr::literal(Literal::Integer(0));
7049 result_stack.push(llkv_expr::expr::ScalarExpr::binary(
7050 zero,
7051 llkv_expr::expr::BinaryOp::Subtract,
7052 other,
7053 ));
7054 }
7055 }
7056 }
7057 ScalarExitContext::UnaryNot => {
7058 let inner = result_stack.pop().ok_or_else(|| {
7059 Error::Internal(
7060 "translate_scalar: result stack underflow for UnaryNot".into(),
7061 )
7062 })?;
7063 result_stack.push(llkv_expr::expr::ScalarExpr::logical_not(inner));
7064 }
7065 ScalarExitContext::UnaryPlus => {
7066 }
7068 ScalarExitContext::Nested => {
7069 }
7071 ScalarExitContext::Cast(target_type) => {
7072 let inner = result_stack.pop().ok_or_else(|| {
7073 Error::Internal("translate_scalar: result stack underflow for CAST".into())
7074 })?;
7075 result_stack.push(llkv_expr::expr::ScalarExpr::cast(inner, target_type));
7076 }
7077 ScalarExitContext::InList { list_len, negated } => {
7078 let mut list_exprs = Vec::with_capacity(list_len);
7079 for _ in 0..list_len {
7080 let value_expr = result_stack.pop().ok_or_else(|| {
7081 Error::Internal(
7082 "translate_scalar: result stack underflow for IN list value".into(),
7083 )
7084 })?;
7085 list_exprs.push(value_expr);
7086 }
7087 list_exprs.reverse();
7088
7089 let target_expr = result_stack.pop().ok_or_else(|| {
7090 Error::Internal(
7091 "translate_scalar: result stack underflow for IN list target".into(),
7092 )
7093 })?;
7094
7095 let mut comparisons: Vec<llkv_expr::expr::ScalarExpr<String>> =
7096 Vec::with_capacity(list_len);
7097 for value in &list_exprs {
7098 comparisons.push(llkv_expr::expr::ScalarExpr::compare(
7099 target_expr.clone(),
7100 llkv_expr::expr::CompareOp::Eq,
7101 value.clone(),
7102 ));
7103 }
7104
7105 let mut branches: Vec<(
7106 llkv_expr::expr::ScalarExpr<String>,
7107 llkv_expr::expr::ScalarExpr<String>,
7108 )> = Vec::with_capacity(list_len.saturating_mul(2));
7109
7110 for comparison in &comparisons {
7111 branches.push((
7112 comparison.clone(),
7113 llkv_expr::expr::ScalarExpr::literal(Literal::Integer(1)),
7114 ));
7115 }
7116
7117 for comparison in comparisons {
7118 let comparison_is_null =
7119 llkv_expr::expr::ScalarExpr::is_null(comparison, false);
7120 branches.push((
7121 comparison_is_null,
7122 llkv_expr::expr::ScalarExpr::literal(Literal::Null),
7123 ));
7124 }
7125
7126 let else_expr = Some(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(0)));
7127 let in_result = llkv_expr::expr::ScalarExpr::case(None, branches, else_expr);
7128 let final_expr = if negated {
7129 llkv_expr::expr::ScalarExpr::logical_not(in_result)
7130 } else {
7131 in_result
7132 };
7133
7134 result_stack.push(final_expr);
7135 }
7136 ScalarExitContext::Case {
7137 branch_count,
7138 has_operand,
7139 has_else,
7140 } => {
7141 let else_expr = if has_else {
7142 Some(result_stack.pop().ok_or_else(|| {
7143 Error::Internal(
7144 "translate_scalar: result stack underflow for CASE ELSE".into(),
7145 )
7146 })?)
7147 } else {
7148 None
7149 };
7150
7151 let mut branches_rev = Vec::with_capacity(branch_count);
7152 for _ in 0..branch_count {
7153 let then_expr = result_stack.pop().ok_or_else(|| {
7154 Error::Internal(
7155 "translate_scalar: result stack underflow for CASE THEN".into(),
7156 )
7157 })?;
7158 let when_expr = result_stack.pop().ok_or_else(|| {
7159 Error::Internal(
7160 "translate_scalar: result stack underflow for CASE WHEN".into(),
7161 )
7162 })?;
7163 branches_rev.push((when_expr, then_expr));
7164 }
7165 branches_rev.reverse();
7166
7167 let operand_expr = if has_operand {
7168 Some(result_stack.pop().ok_or_else(|| {
7169 Error::Internal(
7170 "translate_scalar: result stack underflow for CASE operand".into(),
7171 )
7172 })?)
7173 } else {
7174 None
7175 };
7176
7177 let case_expr =
7178 llkv_expr::expr::ScalarExpr::case(operand_expr, branches_rev, else_expr);
7179 result_stack.push(case_expr);
7180 }
7181 ScalarExitContext::IsNull { negated } => {
7182 let inner = result_stack.pop().ok_or_else(|| {
7183 Error::Internal(
7184 "translate_scalar: result stack underflow for IS NULL operand".into(),
7185 )
7186 })?;
7187 result_stack.push(llkv_expr::expr::ScalarExpr::is_null(inner, negated));
7188 }
7189 ScalarExitContext::Between { negated } => {
7190 let high = result_stack.pop().ok_or_else(|| {
7191 Error::Internal(
7192 "translate_scalar: result stack underflow for BETWEEN upper".into(),
7193 )
7194 })?;
7195 let low = result_stack.pop().ok_or_else(|| {
7196 Error::Internal(
7197 "translate_scalar: result stack underflow for BETWEEN lower".into(),
7198 )
7199 })?;
7200 let expr_value = result_stack.pop().ok_or_else(|| {
7201 Error::Internal(
7202 "translate_scalar: result stack underflow for BETWEEN operand".into(),
7203 )
7204 })?;
7205
7206 let lower_cmp = llkv_expr::expr::ScalarExpr::compare(
7207 expr_value.clone(),
7208 llkv_expr::expr::CompareOp::GtEq,
7209 low,
7210 );
7211 let upper_cmp = llkv_expr::expr::ScalarExpr::compare(
7212 expr_value,
7213 llkv_expr::expr::CompareOp::LtEq,
7214 high,
7215 );
7216 let between_expr = llkv_expr::expr::ScalarExpr::binary(
7217 lower_cmp,
7218 llkv_expr::expr::BinaryOp::Multiply,
7219 upper_cmp,
7220 );
7221 if negated {
7222 result_stack.push(llkv_expr::expr::ScalarExpr::logical_not(between_expr));
7223 } else {
7224 result_stack.push(between_expr);
7225 }
7226 }
7227 },
7228 }
7229 }
7230
7231 result_stack
7232 .pop()
7233 .ok_or_else(|| Error::Internal("translate_scalar: empty result stack".into()))
7234}
7235
7236struct ScalarSubqueryPlanner<'engine, 'vec, P>
7237where
7238 P: Pager<Blob = EntryHandle> + Send + Sync,
7239{
7240 engine: &'engine SqlEngine<P>,
7241 scalar_subqueries: &'vec mut Vec<llkv_plan::ScalarSubquery>,
7242}
7243
7244impl<'engine, 'vec, P> ScalarSubqueryResolver for ScalarSubqueryPlanner<'engine, 'vec, P>
7245where
7246 P: Pager<Blob = EntryHandle> + Send + Sync,
7247{
7248 fn handle_scalar_subquery(
7249 &mut self,
7250 subquery: &Query,
7251 resolver: &IdentifierResolver<'_>,
7252 context: &IdentifierContext,
7253 outer_scopes: &[IdentifierContext],
7254 ) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
7255 let mut nested_scopes = outer_scopes.to_vec();
7256 nested_scopes.push(context.clone());
7257
7258 let mut tracker = SubqueryCorrelatedColumnTracker::new();
7259 let mut nested_filter_subqueries = Vec::new();
7260
7261 let plan = self.engine.build_select_plan_internal(
7262 subquery.clone(),
7263 resolver,
7264 &nested_scopes,
7265 &mut nested_filter_subqueries,
7266 Some(&mut tracker),
7267 )?;
7268
7269 debug_assert!(nested_filter_subqueries.is_empty());
7270
7271 let id = u32::try_from(self.scalar_subqueries.len()).map_err(|_| {
7272 Error::InvalidArgumentError(
7273 "scalar subquery limit exceeded for current query".to_string(),
7274 )
7275 })?;
7276 let subquery_id = llkv_expr::SubqueryId(id);
7277 self.scalar_subqueries.push(llkv_plan::ScalarSubquery {
7278 id: subquery_id,
7279 plan: Box::new(plan),
7280 correlated_columns: tracker.into_columns(),
7281 });
7282
7283 Ok(llkv_expr::expr::ScalarExpr::scalar_subquery(subquery_id))
7284 }
7285}
7286
7287fn build_abs_case_expr(
7288 arg: llkv_expr::expr::ScalarExpr<String>,
7289) -> llkv_expr::expr::ScalarExpr<String> {
7290 use llkv_expr::expr::{BinaryOp, CompareOp, ScalarExpr};
7291
7292 let zero = ScalarExpr::literal(Literal::Integer(0));
7293 let condition = ScalarExpr::compare(arg.clone(), CompareOp::Lt, zero.clone());
7294 let negated = ScalarExpr::binary(zero.clone(), BinaryOp::Subtract, arg.clone());
7295
7296 ScalarExpr::case(None, vec![(condition, negated)], Some(arg))
7297}
7298
7299fn literal_from_value(value: &ValueWithSpan) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
7300 match &value.value {
7301 Value::Number(text, _) => {
7302 if text.contains(['.', 'e', 'E']) {
7303 let parsed = text.parse::<f64>().map_err(|err| {
7304 Error::InvalidArgumentError(format!("invalid float literal: {err}"))
7305 })?;
7306 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(parsed)))
7307 } else {
7308 let parsed = text.parse::<i128>().map_err(|err| {
7309 Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
7310 })?;
7311 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(
7312 parsed,
7313 )))
7314 }
7315 }
7316 Value::Boolean(value) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Boolean(
7317 *value,
7318 ))),
7319 Value::Null => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Null)),
7320 other => {
7321 if let Some(text) = other.clone().into_string() {
7322 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::String(text)))
7323 } else {
7324 Err(Error::InvalidArgumentError(format!(
7325 "unsupported literal: {other:?}"
7326 )))
7327 }
7328 }
7329 }
7330}
7331
7332fn resolve_assignment_column_name(target: &AssignmentTarget) -> SqlResult<String> {
7333 match target {
7334 AssignmentTarget::ColumnName(name) => {
7335 if name.0.len() != 1 {
7336 return Err(Error::InvalidArgumentError(
7337 "qualified column names in UPDATE assignments are not supported yet".into(),
7338 ));
7339 }
7340 match &name.0[0] {
7341 ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
7342 other => Err(Error::InvalidArgumentError(format!(
7343 "unsupported column reference in UPDATE assignment: {other:?}"
7344 ))),
7345 }
7346 }
7347 AssignmentTarget::Tuple(_) => Err(Error::InvalidArgumentError(
7348 "tuple assignments are not supported yet".into(),
7349 )),
7350 }
7351}
7352
7353fn arrow_type_from_sql(data_type: &SqlDataType) -> SqlResult<arrow::datatypes::DataType> {
7354 use arrow::datatypes::DataType;
7355 match data_type {
7356 SqlDataType::Int(_)
7357 | SqlDataType::Integer(_)
7358 | SqlDataType::BigInt(_)
7359 | SqlDataType::SmallInt(_)
7360 | SqlDataType::TinyInt(_) => Ok(DataType::Int64),
7361 SqlDataType::Float(_)
7362 | SqlDataType::Real
7363 | SqlDataType::Double(_)
7364 | SqlDataType::DoublePrecision => Ok(DataType::Float64),
7365 SqlDataType::Text
7366 | SqlDataType::String(_)
7367 | SqlDataType::Varchar(_)
7368 | SqlDataType::Char(_)
7369 | SqlDataType::Uuid => Ok(DataType::Utf8),
7370 SqlDataType::Date => Ok(DataType::Date32),
7371 SqlDataType::Decimal(_) | SqlDataType::Numeric(_) => Ok(DataType::Float64),
7372 SqlDataType::Boolean => Ok(DataType::Boolean),
7373 SqlDataType::Custom(name, args) => {
7374 if name.0.len() == 1
7375 && let ObjectNamePart::Identifier(ident) = &name.0[0]
7376 && ident.value.eq_ignore_ascii_case("row")
7377 {
7378 return row_type_to_arrow(data_type, args);
7379 }
7380 Err(Error::InvalidArgumentError(format!(
7381 "unsupported SQL data type: {data_type:?}"
7382 )))
7383 }
7384 other => Err(Error::InvalidArgumentError(format!(
7385 "unsupported SQL data type: {other:?}"
7386 ))),
7387 }
7388}
7389
7390fn row_type_to_arrow(
7391 data_type: &SqlDataType,
7392 tokens: &[String],
7393) -> SqlResult<arrow::datatypes::DataType> {
7394 use arrow::datatypes::{DataType, Field, FieldRef, Fields};
7395
7396 let row_str = data_type.to_string();
7397 if tokens.is_empty() {
7398 return Err(Error::InvalidArgumentError(
7399 "ROW type must define at least one field".into(),
7400 ));
7401 }
7402
7403 let dialect = GenericDialect {};
7404 let field_definitions = resolve_row_field_types(tokens, &dialect).map_err(|err| {
7405 Error::InvalidArgumentError(format!("unable to parse ROW type '{row_str}': {err}"))
7406 })?;
7407
7408 let mut fields: Vec<FieldRef> = Vec::with_capacity(field_definitions.len());
7409 for (field_name, field_type) in field_definitions {
7410 let arrow_field_type = arrow_type_from_sql(&field_type)?;
7411 fields.push(Arc::new(Field::new(field_name, arrow_field_type, true)));
7412 }
7413
7414 let struct_fields: Fields = fields.into();
7415 Ok(DataType::Struct(struct_fields))
7416}
7417
7418fn resolve_row_field_types(
7419 tokens: &[String],
7420 dialect: &GenericDialect,
7421) -> SqlResult<Vec<(String, SqlDataType)>> {
7422 if tokens.is_empty() {
7423 return Err(Error::InvalidArgumentError(
7424 "ROW type must define at least one field".into(),
7425 ));
7426 }
7427
7428 let mut start = 0;
7429 let mut end = tokens.len();
7430 if tokens[start] == "(" {
7431 if end == 0 || tokens[end - 1] != ")" {
7432 return Err(Error::InvalidArgumentError(
7433 "ROW type is missing closing ')'".into(),
7434 ));
7435 }
7436 start += 1;
7437 end -= 1;
7438 } else if tokens[end - 1] == ")" {
7439 return Err(Error::InvalidArgumentError(
7440 "ROW type contains unmatched ')'".into(),
7441 ));
7442 }
7443
7444 let slice = &tokens[start..end];
7445 if slice.is_empty() {
7446 return Err(Error::InvalidArgumentError(
7447 "ROW type did not provide any field definitions".into(),
7448 ));
7449 }
7450
7451 let mut fields = Vec::new();
7452 let mut index = 0;
7453
7454 while index < slice.len() {
7455 if slice[index] == "," {
7456 index += 1;
7457 continue;
7458 }
7459
7460 let field_name = normalize_row_field_name(&slice[index])?;
7461 index += 1;
7462
7463 if index >= slice.len() {
7464 return Err(Error::InvalidArgumentError(format!(
7465 "ROW field '{field_name}' is missing a type specification"
7466 )));
7467 }
7468
7469 let mut last_success: Option<(usize, SqlDataType)> = None;
7470 let mut type_end = index;
7471
7472 while type_end <= slice.len() {
7473 let candidate = slice[index..type_end].join(" ");
7474 if candidate.trim().is_empty() {
7475 type_end += 1;
7476 continue;
7477 }
7478
7479 if let Ok(parsed_type) = parse_sql_data_type(&candidate, dialect) {
7480 last_success = Some((type_end, parsed_type));
7481 }
7482
7483 if type_end == slice.len() {
7484 break;
7485 }
7486
7487 if slice[type_end] == "," && last_success.is_some() {
7488 break;
7489 }
7490
7491 type_end += 1;
7492 }
7493
7494 let Some((next_index, data_type)) = last_success else {
7495 return Err(Error::InvalidArgumentError(format!(
7496 "failed to parse ROW field type for '{field_name}'"
7497 )));
7498 };
7499
7500 fields.push((field_name, data_type));
7501 index = next_index;
7502
7503 if index < slice.len() && slice[index] == "," {
7504 index += 1;
7505 }
7506 }
7507
7508 if fields.is_empty() {
7509 return Err(Error::InvalidArgumentError(
7510 "ROW type did not provide any field definitions".into(),
7511 ));
7512 }
7513
7514 Ok(fields)
7515}
7516
7517fn parse_sql_with_recursion_limit(
7531 dialect: &GenericDialect,
7532 sql: &str,
7533) -> Result<Vec<Statement>, sqlparser::parser::ParserError> {
7534 Parser::new(dialect)
7535 .with_recursion_limit(PARSER_RECURSION_LIMIT)
7536 .try_with_sql(sql)?
7537 .parse_statements()
7538}
7539
7540fn normalize_row_field_name(raw: &str) -> SqlResult<String> {
7541 let trimmed = raw.trim();
7542 if trimmed.is_empty() {
7543 return Err(Error::InvalidArgumentError(
7544 "ROW field name must not be empty".into(),
7545 ));
7546 }
7547
7548 if let Some(stripped) = trimmed.strip_prefix('"') {
7549 let without_end = stripped.strip_suffix('"').ok_or_else(|| {
7550 Error::InvalidArgumentError(format!("unterminated quoted ROW field name: {trimmed}"))
7551 })?;
7552 let name = without_end.replace("\"\"", "\"");
7553 return Ok(name);
7554 }
7555
7556 Ok(trimmed.to_string())
7557}
7558
7559fn parse_sql_data_type(type_str: &str, dialect: &GenericDialect) -> SqlResult<SqlDataType> {
7560 let trimmed = type_str.trim();
7561 let sql = format!("CREATE TABLE __row(__field {trimmed});");
7562 let statements = parse_sql_with_recursion_limit(dialect, &sql).map_err(|err| {
7563 Error::InvalidArgumentError(format!("failed to parse ROW field type '{trimmed}': {err}"))
7564 })?;
7565
7566 let stmt = statements.into_iter().next().ok_or_else(|| {
7567 Error::InvalidArgumentError(format!(
7568 "ROW field type '{trimmed}' did not produce a statement"
7569 ))
7570 })?;
7571
7572 match stmt {
7573 Statement::CreateTable(table) => table
7574 .columns
7575 .first()
7576 .map(|col| col.data_type.clone())
7577 .ok_or_else(|| {
7578 Error::InvalidArgumentError(format!(
7579 "ROW field type '{trimmed}' missing column definition"
7580 ))
7581 }),
7582 other => Err(Error::InvalidArgumentError(format!(
7583 "unexpected statement while parsing ROW field type: {other:?}"
7584 ))),
7585 }
7586}
7587
7588type ExtractValuesResult = Option<(Vec<Vec<PlanValue>>, Vec<String>)>;
7591
7592#[allow(clippy::type_complexity)]
7593fn extract_values_from_derived_table(from: &[TableWithJoins]) -> SqlResult<ExtractValuesResult> {
7594 if from.len() != 1 {
7595 return Ok(None);
7596 }
7597
7598 let table_with_joins = &from[0];
7599 if !table_with_joins.joins.is_empty() {
7600 return Ok(None);
7601 }
7602
7603 match &table_with_joins.relation {
7604 TableFactor::Derived {
7605 subquery, alias, ..
7606 } => {
7607 let values = match subquery.body.as_ref() {
7609 SetExpr::Values(v) => v,
7610 _ => return Ok(None),
7611 };
7612
7613 let column_names = if let Some(alias) = alias {
7615 alias
7616 .columns
7617 .iter()
7618 .map(|col_def| col_def.name.value.clone())
7619 .collect::<Vec<_>>()
7620 } else {
7621 if values.rows.is_empty() {
7623 return Err(Error::InvalidArgumentError(
7624 "VALUES expression must have at least one row".into(),
7625 ));
7626 }
7627 let first_row = &values.rows[0];
7628 (0..first_row.len())
7629 .map(|i| format!("column{}", i))
7630 .collect()
7631 };
7632
7633 if values.rows.is_empty() {
7635 return Err(Error::InvalidArgumentError(
7636 "VALUES expression must have at least one row".into(),
7637 ));
7638 }
7639
7640 let mut rows = Vec::with_capacity(values.rows.len());
7641 for row in &values.rows {
7642 if row.len() != column_names.len() {
7643 return Err(Error::InvalidArgumentError(format!(
7644 "VALUES row has {} columns but table alias specifies {} columns",
7645 row.len(),
7646 column_names.len()
7647 )));
7648 }
7649
7650 let mut converted_row = Vec::with_capacity(row.len());
7651 for expr in row {
7652 let value = SqlValue::try_from_expr(expr)?;
7653 converted_row.push(PlanValue::from(value));
7654 }
7655 rows.push(converted_row);
7656 }
7657
7658 Ok(Some((rows, column_names)))
7659 }
7660 _ => Ok(None),
7661 }
7662}
7663
7664fn extract_constant_select_rows(select: &Select) -> SqlResult<Option<Vec<Vec<PlanValue>>>> {
7665 if !select.from.is_empty() {
7666 return Ok(None);
7667 }
7668
7669 if select.selection.is_some()
7670 || select.having.is_some()
7671 || !select.named_window.is_empty()
7672 || select.qualify.is_some()
7673 || select.distinct.is_some()
7674 || select.top.is_some()
7675 || select.into.is_some()
7676 || select.prewhere.is_some()
7677 || !select.lateral_views.is_empty()
7678 || select.value_table_mode.is_some()
7679 || !group_by_is_empty(&select.group_by)
7680 {
7681 return Err(Error::InvalidArgumentError(
7682 "constant SELECT statements do not support advanced clauses".into(),
7683 ));
7684 }
7685
7686 if select.projection.is_empty() {
7687 return Err(Error::InvalidArgumentError(
7688 "constant SELECT requires at least one projection".into(),
7689 ));
7690 }
7691
7692 let mut row: Vec<PlanValue> = Vec::with_capacity(select.projection.len());
7693 for item in &select.projection {
7694 let expr = match item {
7695 SelectItem::UnnamedExpr(expr) => expr,
7696 SelectItem::ExprWithAlias { expr, .. } => expr,
7697 other => {
7698 return Err(Error::InvalidArgumentError(format!(
7699 "unsupported projection in constant SELECT: {other:?}"
7700 )));
7701 }
7702 };
7703
7704 let value = SqlValue::try_from_expr(expr)?;
7705 row.push(PlanValue::from(value));
7706 }
7707
7708 Ok(Some(vec![row]))
7709}
7710
7711fn extract_single_table(from: &[TableWithJoins]) -> SqlResult<(String, String)> {
7712 if from.len() != 1 {
7713 return Err(Error::InvalidArgumentError(
7714 "queries over multiple tables are not supported yet".into(),
7715 ));
7716 }
7717 let item = &from[0];
7718
7719 if !item.joins.is_empty() {
7721 return Err(Error::InvalidArgumentError(
7722 "JOIN clauses are not supported yet".into(),
7723 ));
7724 }
7725 match &item.relation {
7726 TableFactor::Table { name, .. } => canonical_object_name(name),
7727 TableFactor::Derived { alias, .. } => {
7728 let table_name = alias
7731 .as_ref()
7732 .map(|a| a.name.value.clone())
7733 .unwrap_or_else(|| "derived".to_string());
7734 let canonical = table_name.to_ascii_lowercase();
7735 Ok((table_name, canonical))
7736 }
7737 _ => Err(Error::InvalidArgumentError(
7738 "queries require a plain table name or derived table".into(),
7739 )),
7740 }
7741}
7742
7743fn extract_tables(
7751 from: &[TableWithJoins],
7752) -> SqlResult<(
7753 Vec<llkv_plan::TableRef>,
7754 Vec<llkv_plan::JoinMetadata>,
7755 Vec<SqlExpr>,
7756)> {
7757 let mut tables = Vec::new();
7758 let mut join_metadata = Vec::new();
7759 let mut join_filters = Vec::new();
7760
7761 for item in from {
7762 push_table_factor(&item.relation, &mut tables)?;
7763
7764 for join in &item.joins {
7765 let left_table_index = tables.len() - 1;
7766
7767 match &join.join_operator {
7768 JoinOperator::CrossJoin(JoinConstraint::None)
7769 | JoinOperator::Join(JoinConstraint::None)
7770 | JoinOperator::Inner(JoinConstraint::None) => {
7771 push_table_factor(&join.relation, &mut tables)?;
7772 join_metadata.push(llkv_plan::JoinMetadata {
7773 left_table_index,
7774 join_type: llkv_plan::JoinPlan::Inner,
7775 on_condition: None,
7776 });
7777 }
7778 JoinOperator::Join(JoinConstraint::On(condition))
7779 | JoinOperator::Inner(JoinConstraint::On(condition)) => {
7780 push_table_factor(&join.relation, &mut tables)?;
7781 join_filters.push(condition.clone());
7782 join_metadata.push(llkv_plan::JoinMetadata {
7784 left_table_index,
7785 join_type: llkv_plan::JoinPlan::Inner,
7786 on_condition: None, });
7788 }
7789 JoinOperator::Left(JoinConstraint::On(condition))
7790 | JoinOperator::LeftOuter(JoinConstraint::On(condition)) => {
7791 push_table_factor(&join.relation, &mut tables)?;
7792 join_filters.push(condition.clone());
7793 join_metadata.push(llkv_plan::JoinMetadata {
7794 left_table_index,
7795 join_type: llkv_plan::JoinPlan::Left,
7796 on_condition: None,
7797 });
7798 }
7799 JoinOperator::Left(JoinConstraint::None)
7800 | JoinOperator::LeftOuter(JoinConstraint::None) => {
7801 push_table_factor(&join.relation, &mut tables)?;
7802 join_metadata.push(llkv_plan::JoinMetadata {
7803 left_table_index,
7804 join_type: llkv_plan::JoinPlan::Left,
7805 on_condition: None,
7806 });
7807 }
7808 JoinOperator::CrossJoin(_) => {
7809 return Err(Error::InvalidArgumentError(
7810 "CROSS JOIN with constraints is not supported".into(),
7811 ));
7812 }
7813 JoinOperator::Join(JoinConstraint::Using(_))
7814 | JoinOperator::Inner(JoinConstraint::Using(_))
7815 | JoinOperator::Left(JoinConstraint::Using(_))
7816 | JoinOperator::LeftOuter(JoinConstraint::Using(_)) => {
7817 return Err(Error::InvalidArgumentError(
7818 "JOIN ... USING (...) is not supported yet".into(),
7819 ));
7820 }
7821 JoinOperator::Join(JoinConstraint::Natural)
7822 | JoinOperator::Inner(JoinConstraint::Natural)
7823 | JoinOperator::Left(JoinConstraint::Natural)
7824 | JoinOperator::LeftOuter(JoinConstraint::Natural)
7825 | JoinOperator::Right(_)
7826 | JoinOperator::RightOuter(_)
7827 | JoinOperator::FullOuter(_)
7828 | JoinOperator::Semi(_)
7829 | JoinOperator::LeftSemi(_)
7830 | JoinOperator::LeftAnti(_)
7831 | JoinOperator::RightSemi(_)
7832 | JoinOperator::RightAnti(_)
7833 | JoinOperator::CrossApply
7834 | JoinOperator::OuterApply
7835 | JoinOperator::Anti(_)
7836 | JoinOperator::StraightJoin(_) => {
7837 return Err(Error::InvalidArgumentError(
7838 "only INNER JOIN and LEFT JOIN with optional ON constraints are supported"
7839 .into(),
7840 ));
7841 }
7842 other => {
7843 return Err(Error::InvalidArgumentError(format!(
7844 "unsupported JOIN clause: {other:?}"
7845 )));
7846 }
7847 }
7848 }
7849 }
7850
7851 Ok((tables, join_metadata, join_filters))
7852}
7853
7854fn push_table_factor(factor: &TableFactor, tables: &mut Vec<llkv_plan::TableRef>) -> SqlResult<()> {
7855 match factor {
7856 TableFactor::Table { name, alias, .. } => {
7857 let (schema_opt, table) = parse_schema_qualified_name(name)?;
7858 let schema = schema_opt.unwrap_or_default();
7859 let alias_name = alias.as_ref().map(|a| a.name.value.clone());
7860 tables.push(llkv_plan::TableRef::with_alias(schema, table, alias_name));
7861 Ok(())
7862 }
7863 TableFactor::Derived { .. } => Err(Error::InvalidArgumentError(
7864 "JOIN clauses require base tables; derived tables are not supported".into(),
7865 )),
7866 _ => Err(Error::InvalidArgumentError(
7867 "queries require a plain table name".into(),
7868 )),
7869 }
7870}
7871
7872fn group_by_is_empty(expr: &GroupByExpr) -> bool {
7873 matches!(
7874 expr,
7875 GroupByExpr::Expressions(exprs, modifiers)
7876 if exprs.is_empty() && modifiers.is_empty()
7877 )
7878}
7879
7880fn convert_value_table_mode(mode: sqlparser::ast::ValueTableMode) -> llkv_plan::ValueTableMode {
7881 use llkv_plan::ValueTableMode as PlanMode;
7882 match mode {
7883 sqlparser::ast::ValueTableMode::AsStruct => PlanMode::AsStruct,
7884 sqlparser::ast::ValueTableMode::AsValue => PlanMode::AsValue,
7885 sqlparser::ast::ValueTableMode::DistinctAsStruct => PlanMode::DistinctAsStruct,
7886 sqlparser::ast::ValueTableMode::DistinctAsValue => PlanMode::DistinctAsValue,
7887 }
7888}
7889#[cfg(test)]
7890mod tests {
7891 use super::*;
7892 use arrow::array::{Array, Float64Array, Int32Array, Int64Array, StringArray};
7893 use arrow::record_batch::RecordBatch;
7894 use llkv_storage::pager::MemPager;
7895
7896 fn extract_string_options(batches: &[RecordBatch]) -> Vec<Option<String>> {
7897 let mut values: Vec<Option<String>> = Vec::new();
7898 for batch in batches {
7899 let column = batch
7900 .column(0)
7901 .as_any()
7902 .downcast_ref::<StringArray>()
7903 .expect("string column");
7904 for idx in 0..column.len() {
7905 if column.is_null(idx) {
7906 values.push(None);
7907 } else {
7908 values.push(Some(column.value(idx).to_string()));
7909 }
7910 }
7911 }
7912 values
7913 }
7914
7915 #[test]
7916 fn test_insert_batching_across_calls() {
7917 let engine = SqlEngine::new(Arc::new(MemPager::default()));
7918
7919 engine.execute("CREATE TABLE test (id INTEGER)").unwrap();
7921
7922 engine.execute("INSERT INTO test VALUES (1)").unwrap();
7924 engine.execute("INSERT INTO test VALUES (2)").unwrap();
7925
7926 let result = engine.execute("SELECT * FROM test ORDER BY id").unwrap();
7928 let select_result = result
7929 .into_iter()
7930 .find_map(|res| match res {
7931 RuntimeStatementResult::Select { execution, .. } => {
7932 Some(execution.collect().unwrap())
7933 }
7934 _ => None,
7935 })
7936 .expect("expected SELECT result in response");
7937 let batches = select_result;
7938 assert_eq!(
7939 batches[0].num_rows(),
7940 2,
7941 "Should have 2 rows after cross-call batching"
7942 );
7943 }
7944
7945 #[test]
7946 fn create_insert_select_roundtrip() {
7947 let pager = Arc::new(MemPager::default());
7948 let engine = SqlEngine::new(pager);
7949
7950 let result = engine
7951 .execute("CREATE TABLE people (id INT NOT NULL, name TEXT NOT NULL)")
7952 .expect("create table");
7953 assert!(matches!(
7954 result[0],
7955 RuntimeStatementResult::CreateTable { .. }
7956 ));
7957
7958 let result = engine
7959 .execute("INSERT INTO people (id, name) VALUES (1, 'alice'), (2, 'bob')")
7960 .expect("insert rows");
7961 assert!(matches!(
7962 result[0],
7963 RuntimeStatementResult::Insert {
7964 rows_inserted: 2,
7965 ..
7966 }
7967 ));
7968
7969 let mut result = engine
7970 .execute("SELECT name FROM people WHERE id = 2")
7971 .expect("select rows");
7972 let select_result = result.remove(0);
7973 let batches = match select_result {
7974 RuntimeStatementResult::Select { execution, .. } => {
7975 execution.collect().expect("collect batches")
7976 }
7977 _ => panic!("expected select result"),
7978 };
7979 assert_eq!(batches.len(), 1);
7980 let column = batches[0]
7981 .column(0)
7982 .as_any()
7983 .downcast_ref::<StringArray>()
7984 .expect("string column");
7985 assert_eq!(column.len(), 1);
7986 assert_eq!(column.value(0), "bob");
7987 }
7988
7989 #[test]
7990 fn insert_select_constant_including_null() {
7991 let pager = Arc::new(MemPager::default());
7992 let engine = SqlEngine::new(pager);
7993
7994 engine
7995 .execute("CREATE TABLE integers(i INTEGER)")
7996 .expect("create table");
7997
7998 let result = engine
7999 .execute("INSERT INTO integers SELECT 42")
8000 .expect("insert literal");
8001 assert!(matches!(
8002 result[0],
8003 RuntimeStatementResult::Insert {
8004 rows_inserted: 1,
8005 ..
8006 }
8007 ));
8008
8009 let result = engine
8010 .execute("INSERT INTO integers SELECT CAST(NULL AS VARCHAR)")
8011 .expect("insert null literal");
8012 assert!(matches!(
8013 result[0],
8014 RuntimeStatementResult::Insert {
8015 rows_inserted: 1,
8016 ..
8017 }
8018 ));
8019
8020 let mut result = engine
8021 .execute("SELECT * FROM integers")
8022 .expect("select rows");
8023 let select_result = result.remove(0);
8024 let batches = match select_result {
8025 RuntimeStatementResult::Select { execution, .. } => {
8026 execution.collect().expect("collect batches")
8027 }
8028 _ => panic!("expected select result"),
8029 };
8030
8031 let mut values: Vec<Option<i64>> = Vec::new();
8032 for batch in &batches {
8033 let column = batch
8034 .column(0)
8035 .as_any()
8036 .downcast_ref::<Int64Array>()
8037 .expect("int column");
8038 for idx in 0..column.len() {
8039 if column.is_null(idx) {
8040 values.push(None);
8041 } else {
8042 values.push(Some(column.value(idx)));
8043 }
8044 }
8045 }
8046
8047 assert_eq!(values, vec![Some(42), None]);
8048 }
8049
8050 #[test]
8051 fn not_null_comparison_filters_all_rows() {
8052 let pager = Arc::new(MemPager::default());
8053 let engine = SqlEngine::new(pager);
8054
8055 engine
8056 .execute("CREATE TABLE single(col INTEGER)")
8057 .expect("create table");
8058 engine
8059 .execute("INSERT INTO single VALUES (1)")
8060 .expect("insert row");
8061
8062 let batches = engine
8063 .sql("SELECT * FROM single WHERE NOT ( NULL ) >= NULL")
8064 .expect("run constant null comparison");
8065
8066 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
8067 assert_eq!(total_rows, 0, "expected filter to remove all rows");
8068 }
8069
8070 #[test]
8071 fn not_null_in_list_filters_all_rows() {
8072 let pager = Arc::new(MemPager::default());
8073 let engine = SqlEngine::new(pager);
8074
8075 engine
8076 .execute("CREATE TABLE tab0(col0 INTEGER, col1 INTEGER, col2 INTEGER)")
8077 .expect("create table");
8078 engine
8079 .execute("INSERT INTO tab0 VALUES (1, 2, 3)")
8080 .expect("insert row");
8081
8082 let batches = engine
8083 .sql("SELECT * FROM tab0 WHERE NOT ( NULL ) IN ( - col2 * + col2 )")
8084 .expect("run IN list null comparison");
8085
8086 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
8087 assert_eq!(total_rows, 0, "expected IN list filter to remove all rows");
8088 }
8089
8090 #[test]
8091 fn cross_join_not_null_comparison_filters_all_rows() {
8092 let pager = Arc::new(MemPager::default());
8093 let engine = SqlEngine::new(pager);
8094
8095 engine
8096 .execute("CREATE TABLE left_side(col INTEGER)")
8097 .expect("create left table");
8098 engine
8099 .execute("CREATE TABLE right_side(col INTEGER)")
8100 .expect("create right table");
8101 engine
8102 .execute("INSERT INTO left_side VALUES (1)")
8103 .expect("insert left row");
8104 engine
8105 .execute("INSERT INTO right_side VALUES (2)")
8106 .expect("insert right row");
8107
8108 let batches = engine
8109 .sql("SELECT * FROM left_side CROSS JOIN right_side WHERE NOT ( NULL ) >= NULL")
8110 .expect("run cross join null comparison");
8111
8112 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
8113 assert_eq!(
8114 total_rows, 0,
8115 "expected cross join filter to remove all rows"
8116 );
8117 }
8118
8119 #[test]
8120 fn not_between_null_bounds_matches_sqlite_behavior() {
8121 let pager = Arc::new(MemPager::default());
8122 let engine = SqlEngine::new(pager);
8123
8124 engine
8125 .execute("CREATE TABLE tab2(col1 INTEGER, col2 INTEGER)")
8126 .expect("create tab2");
8127 engine
8128 .execute("INSERT INTO tab2 VALUES (1, 2), (-5, 7), (NULL, 11)")
8129 .expect("seed rows");
8130
8131 let batches = engine
8132 .sql(
8133 "SELECT DISTINCT - col2 AS col1 FROM tab2 WHERE NOT ( col1 ) BETWEEN ( NULL ) AND ( + col1 - col2 )",
8134 )
8135 .expect("run NOT BETWEEN query with NULL bounds");
8136
8137 let mut values: Vec<i64> = Vec::new();
8138 for batch in &batches {
8139 let column = batch.column(0);
8140 match column.data_type() {
8141 arrow::datatypes::DataType::Int64 => {
8142 let array = column
8143 .as_any()
8144 .downcast_ref::<Int64Array>()
8145 .expect("int64 column");
8146 for idx in 0..array.len() {
8147 if !array.is_null(idx) {
8148 values.push(array.value(idx));
8149 }
8150 }
8151 }
8152 arrow::datatypes::DataType::Int32 => {
8153 let array = column
8154 .as_any()
8155 .downcast_ref::<Int32Array>()
8156 .expect("int32 column");
8157 for idx in 0..array.len() {
8158 if !array.is_null(idx) {
8159 values.push(array.value(idx) as i64);
8160 }
8161 }
8162 }
8163 other => panic!("unexpected data type: {other:?}"),
8164 }
8165 }
8166
8167 values.sort_unstable();
8168 assert_eq!(values, vec![-7, -2]);
8169 }
8170
8171 #[test]
8172 fn not_chain_precedence_matches_sqlite_behavior() {
8173 let pager = Arc::new(MemPager::default());
8174 let engine = SqlEngine::new(pager);
8175
8176 engine
8177 .execute("CREATE TABLE tab1(col0 INTEGER)")
8178 .expect("create tab1");
8179 engine
8180 .execute("INSERT INTO tab1 VALUES (1), (2)")
8181 .expect("seed tab1");
8182
8183 use sqlparser::ast::Statement;
8184 use sqlparser::dialect::SQLiteDialect;
8185 use sqlparser::parser::Parser;
8186
8187 let dialect = SQLiteDialect {};
8188 let mut statements = Parser::parse_sql(
8189 &dialect,
8190 "SELECT DISTINCT 85 AS value FROM tab1 WHERE NOT + 84 < - + 69 GROUP BY col0, col0",
8191 )
8192 .expect("parse sql");
8193 let statement = statements.pop().expect("expected single statement");
8194 let Statement::Query(query_ast) = statement else {
8195 panic!("expected SELECT query");
8196 };
8197 let plan = engine
8198 .build_select_plan(*query_ast)
8199 .expect("build select plan");
8200 let filter_expr = plan.filter.expect("expected filter predicate").predicate;
8201 if let llkv_expr::expr::Expr::Not(inner) = &filter_expr {
8202 if !matches!(inner.as_ref(), llkv_expr::expr::Expr::Compare { .. }) {
8203 panic!("expected NOT to wrap comparison, got: {inner:?}");
8204 }
8205 } else {
8206 panic!("expected filter to be NOT-wrapped comparison: {filter_expr:?}");
8207 }
8208
8209 let batches = engine
8210 .sql(
8211 "SELECT DISTINCT 85 AS value FROM tab1 WHERE NOT + 84 < - + 69 GROUP BY col0, col0",
8212 )
8213 .expect("run NOT precedence query");
8214
8215 let mut values: Vec<i64> = Vec::new();
8216 for batch in &batches {
8217 let column = batch.column(0);
8218 match column.data_type() {
8219 arrow::datatypes::DataType::Int64 => {
8220 let array = column
8221 .as_any()
8222 .downcast_ref::<Int64Array>()
8223 .expect("int64 column");
8224 for idx in 0..array.len() {
8225 if !array.is_null(idx) {
8226 values.push(array.value(idx));
8227 }
8228 }
8229 }
8230 arrow::datatypes::DataType::Int32 => {
8231 let array = column
8232 .as_any()
8233 .downcast_ref::<Int32Array>()
8234 .expect("int32 column");
8235 for idx in 0..array.len() {
8236 if !array.is_null(idx) {
8237 values.push(array.value(idx) as i64);
8238 }
8239 }
8240 }
8241 arrow::datatypes::DataType::Float64 => {
8242 let array = column
8243 .as_any()
8244 .downcast_ref::<Float64Array>()
8245 .expect("float64 column");
8246 for idx in 0..array.len() {
8247 if !array.is_null(idx) {
8248 values.push(array.value(idx) as i64);
8249 }
8250 }
8251 }
8252 other => panic!("unexpected data type: {other:?}"),
8253 }
8254 }
8255
8256 values.sort_unstable();
8257 assert_eq!(values, vec![85]);
8258 }
8259
8260 #[test]
8261 fn not_between_null_bounds_matches_harness_fixture() {
8262 let pager = Arc::new(MemPager::default());
8263 let engine = SqlEngine::new(pager);
8264
8265 engine
8266 .execute("CREATE TABLE tab2(col0 INTEGER, col1 INTEGER, col2 INTEGER)")
8267 .expect("create tab2");
8268 engine
8269 .execute("INSERT INTO tab2 VALUES (7, 31, 27), (79, 17, 38), (78, 59, 26)")
8270 .expect("seed rows");
8271
8272 let batches = engine
8273 .sql(
8274 "SELECT DISTINCT - col2 AS col1 FROM tab2 WHERE NOT ( col1 ) BETWEEN ( NULL ) AND ( + col1 - col2 )",
8275 )
8276 .expect("run harness-matched NOT BETWEEN query");
8277
8278 let mut values: Vec<i64> = Vec::new();
8279 for batch in &batches {
8280 let column = batch
8281 .column(0)
8282 .as_any()
8283 .downcast_ref::<Int64Array>()
8284 .expect("integer column");
8285 for idx in 0..column.len() {
8286 if !column.is_null(idx) {
8287 values.push(column.value(idx));
8288 }
8289 }
8290 }
8291
8292 values.sort_unstable();
8293 assert_eq!(values, vec![-38, -27, -26]);
8294 }
8295
8296 #[test]
8297 fn not_between_null_bounds_parser_negated_flag() {
8298 use sqlparser::ast::{Expr as SqlExprAst, Statement};
8299 use sqlparser::dialect::SQLiteDialect;
8300 use sqlparser::parser::Parser;
8301
8302 let dialect = SQLiteDialect {};
8303 let sql = "SELECT DISTINCT - col2 AS col1 FROM tab2 WHERE NOT ( col1 ) BETWEEN ( NULL ) AND ( + col1 - col2 )";
8304
8305 let mut statements = Parser::parse_sql(&dialect, sql).expect("parse sql");
8306 let statement = statements.pop().expect("expected single statement");
8307 let Statement::Query(query) = statement else {
8308 panic!("expected SELECT query");
8309 };
8310 let select = query.body.as_select().expect("expected SELECT body");
8311 let where_expr = select.selection.as_ref().expect("expected WHERE clause");
8312
8313 match where_expr {
8314 SqlExprAst::UnaryOp {
8315 op: sqlparser::ast::UnaryOperator::Not,
8316 expr,
8317 } => match expr.as_ref() {
8318 SqlExprAst::Between { negated, .. } => {
8319 assert!(
8320 !negated,
8321 "expected BETWEEN parser to treat leading NOT as part of expression"
8322 );
8323 }
8324 other => panic!("unexpected inner expression: {other:?}"),
8325 },
8326 other => panic!("unexpected where expression: {other:?}"),
8327 }
8328 }
8329
8330 #[test]
8331 fn double_negated_between_null_bounds_filters_all_rows() {
8332 let pager = Arc::new(MemPager::default());
8333 let engine = SqlEngine::new(pager);
8334
8335 engine
8336 .execute("CREATE TABLE tab2(col0 INTEGER, col1 INTEGER, col2 INTEGER)")
8337 .expect("create tab2");
8338 engine
8339 .execute("INSERT INTO tab2 VALUES (1, 2, 3), (-2, -13, 19), (NULL, 5, 7)")
8340 .expect("seed rows");
8341
8342 let batches = engine
8343 .sql(
8344 "SELECT - col1 * + col2 FROM tab2 WHERE NOT ( col1 ) NOT BETWEEN ( NULL ) AND ( col0 )",
8345 )
8346 .expect("run double NOT BETWEEN query with NULL bounds");
8347
8348 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
8349 assert_eq!(
8350 total_rows, 0,
8351 "expected double NOT BETWEEN to filter all rows"
8352 );
8353 }
8354
8355 #[test]
8356 fn not_scalar_less_than_null_filters_all_rows() {
8357 let pager = Arc::new(MemPager::default());
8358 let engine = SqlEngine::new(pager);
8359
8360 engine
8361 .execute("CREATE TABLE tab(col0 INTEGER, col2 INTEGER)")
8362 .expect("create tab");
8363 engine
8364 .execute("INSERT INTO tab VALUES (1, 2), (5, 10), (-3, 7)")
8365 .expect("seed rows");
8366
8367 let batches = engine
8368 .sql("SELECT col0 FROM tab WHERE NOT ( - col0 / - col2 + - col0 ) < NULL")
8369 .expect("run NOT < NULL query");
8370
8371 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
8372 assert_eq!(total_rows, 0, "expected NOT < NULL to filter all rows");
8373 }
8374
8375 #[test]
8376 fn cross_join_duplicate_table_name_resolves_columns() {
8377 let pager = Arc::new(MemPager::default());
8378 let engine = SqlEngine::new(pager);
8379
8380 use sqlparser::ast::{SetExpr, Statement};
8381 use sqlparser::dialect::SQLiteDialect;
8382 use sqlparser::parser::Parser;
8383
8384 engine
8385 .execute("CREATE TABLE tab1(col0 INTEGER, col1 INTEGER, col2 INTEGER)")
8386 .expect("create tab1");
8387 engine
8388 .execute("INSERT INTO tab1 VALUES (7, 8, 9)")
8389 .expect("insert tab1 row");
8390
8391 let dialect = SQLiteDialect {};
8392 let ast = Parser::parse_sql(
8393 &dialect,
8394 "SELECT tab1.col2 FROM tab1 AS cor0 CROSS JOIN tab1",
8395 )
8396 .expect("parse cross join query");
8397 let Statement::Query(query) = &ast[0] else {
8398 panic!("expected SELECT query");
8399 };
8400 let select = match query.body.as_ref() {
8401 SetExpr::Select(select) => select.as_ref(),
8402 other => panic!("unexpected query body: {other:?}"),
8403 };
8404 assert_eq!(select.from.len(), 1);
8405 assert!(!select.from[0].joins.is_empty());
8406
8407 let batches = engine
8408 .sql("SELECT tab1.col2 FROM tab1 AS cor0 CROSS JOIN tab1")
8409 .expect("run cross join with alias and base table");
8410
8411 let mut values = Vec::new();
8412 for batch in &batches {
8413 let column = batch
8414 .column(0)
8415 .as_any()
8416 .downcast_ref::<Int64Array>()
8417 .expect("int64 column");
8418 for idx in 0..column.len() {
8419 if !column.is_null(idx) {
8420 values.push(column.value(idx));
8421 }
8422 }
8423 }
8424 assert_eq!(values, vec![9]);
8425
8426 engine
8427 .execute("CREATE TABLE strings(a TEXT)")
8428 .expect("create table");
8429
8430 engine
8431 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
8432 .expect("insert seed rows");
8433
8434 let result = engine
8435 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
8436 .expect("update rows");
8437 assert!(matches!(
8438 result[0],
8439 RuntimeStatementResult::Update {
8440 rows_updated: 1,
8441 ..
8442 }
8443 ));
8444
8445 let mut result = engine
8446 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
8447 .expect("select rows");
8448 let select_result = result.remove(0);
8449 let batches = match select_result {
8450 RuntimeStatementResult::Select { execution, .. } => {
8451 execution.collect().expect("collect batches")
8452 }
8453 _ => panic!("expected select result"),
8454 };
8455
8456 let mut values: Vec<Option<String>> = Vec::new();
8457 for batch in &batches {
8458 let column = batch
8459 .column(0)
8460 .as_any()
8461 .downcast_ref::<StringArray>()
8462 .expect("string column");
8463 for idx in 0..column.len() {
8464 if column.is_null(idx) {
8465 values.push(None);
8466 } else {
8467 values.push(Some(column.value(idx).to_string()));
8468 }
8469 }
8470 }
8471
8472 values.sort_by(|a, b| match (a, b) {
8473 (None, None) => std::cmp::Ordering::Equal,
8474 (None, Some(_)) => std::cmp::Ordering::Less,
8475 (Some(_), None) => std::cmp::Ordering::Greater,
8476 (Some(av), Some(bv)) => {
8477 let a_val = av.parse::<i64>().unwrap_or_default();
8478 let b_val = bv.parse::<i64>().unwrap_or_default();
8479 a_val.cmp(&b_val)
8480 }
8481 });
8482
8483 assert_eq!(
8484 values,
8485 vec![None, Some("4".to_string()), Some("13".to_string())]
8486 );
8487 }
8488
8489 #[test]
8490 fn order_by_honors_configured_default_null_order() {
8491 let pager = Arc::new(MemPager::default());
8492 let engine = SqlEngine::new(pager);
8493
8494 engine
8495 .execute("CREATE TABLE strings(a VARCHAR)")
8496 .expect("create table");
8497 engine
8498 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
8499 .expect("insert values");
8500 engine
8501 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
8502 .expect("update value");
8503
8504 let mut result = engine
8505 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
8506 .expect("select rows");
8507 let select_result = result.remove(0);
8508 let batches = match select_result {
8509 RuntimeStatementResult::Select { execution, .. } => {
8510 execution.collect().expect("collect batches")
8511 }
8512 _ => panic!("expected select result"),
8513 };
8514
8515 let values = extract_string_options(&batches);
8516 assert_eq!(
8517 values,
8518 vec![Some("4".to_string()), Some("13".to_string()), None]
8519 );
8520
8521 assert!(!engine.default_nulls_first_for_tests());
8522
8523 engine
8524 .execute("SET default_null_order='nulls_first'")
8525 .expect("set default null order");
8526
8527 assert!(engine.default_nulls_first_for_tests());
8528
8529 let mut result = engine
8530 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
8531 .expect("select rows");
8532 let select_result = result.remove(0);
8533 let batches = match select_result {
8534 RuntimeStatementResult::Select { execution, .. } => {
8535 execution.collect().expect("collect batches")
8536 }
8537 _ => panic!("expected select result"),
8538 };
8539
8540 let values = extract_string_options(&batches);
8541 assert_eq!(
8542 values,
8543 vec![None, Some("4".to_string()), Some("13".to_string())]
8544 );
8545 }
8546
8547 #[test]
8548 fn arrow_type_from_row_returns_struct_fields() {
8549 let dialect = GenericDialect {};
8550 let statements = parse_sql_with_recursion_limit(
8551 &dialect,
8552 "CREATE TABLE row_types(payload ROW(a INTEGER, b VARCHAR));",
8553 )
8554 .expect("parse ROW type definition");
8555
8556 let data_type = match &statements[0] {
8557 Statement::CreateTable(stmt) => stmt.columns[0].data_type.clone(),
8558 other => panic!("unexpected statement: {other:?}"),
8559 };
8560
8561 let arrow_type = arrow_type_from_sql(&data_type).expect("convert ROW type");
8562 match arrow_type {
8563 arrow::datatypes::DataType::Struct(fields) => {
8564 assert_eq!(fields.len(), 2, "unexpected field count");
8565 assert_eq!(fields[0].name(), "a");
8566 assert_eq!(fields[1].name(), "b");
8567 assert_eq!(fields[0].data_type(), &arrow::datatypes::DataType::Int64);
8568 assert_eq!(fields[1].data_type(), &arrow::datatypes::DataType::Utf8);
8569 }
8570 other => panic!("expected struct type, got {other:?}"),
8571 }
8572 }
8573}