1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
4
5use crate::SqlResult;
6use crate::SqlValue;
7
8use llkv_expr::literal::Literal;
9use llkv_result::Error;
10use llkv_runtime::{
11 AggregateExpr, AssignmentValue, ColumnAssignment, ColumnSpec, CreateTablePlan,
12 CreateTableSource, DeletePlan, InsertPlan, InsertSource, OrderByPlan, OrderSortType,
13 OrderTarget, PlanStatement, PlanValue, RuntimeContext, RuntimeEngine, RuntimeSession,
14 RuntimeStatementResult, SelectPlan, SelectProjection, UpdatePlan, extract_rows_from_range,
15};
16use llkv_storage::pager::Pager;
17use simd_r_drive_entry_handle::EntryHandle;
18use sqlparser::ast::{
19 Assignment, AssignmentTarget, BeginTransactionKind, BinaryOperator, ColumnOption,
20 ColumnOptionDef, DataType as SqlDataType, Delete, ExceptionWhen, Expr as SqlExpr, FromTable,
21 FunctionArg, FunctionArgExpr, FunctionArguments, GroupByExpr, Ident, LimitClause, ObjectName,
22 ObjectNamePart, ObjectType, OrderBy, OrderByExpr, OrderByKind, Query, Select, SelectItem,
23 SelectItemQualifiedWildcardKind, Set, SetExpr, Statement, TableFactor, TableObject,
24 TableWithJoins, TransactionMode, TransactionModifier, UnaryOperator, UpdateTableFromKind,
25 Value, ValueWithSpan,
26};
27use sqlparser::dialect::GenericDialect;
28use sqlparser::parser::Parser;
29
30pub struct SqlEngine<P>
31where
32 P: Pager<Blob = EntryHandle> + Send + Sync,
33{
34 engine: RuntimeEngine<P>,
35 default_nulls_first: AtomicBool,
36}
37
38const DROPPED_TABLE_TRANSACTION_ERR: &str = "another transaction has dropped this table";
39
40impl<P> Clone for SqlEngine<P>
41where
42 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
43{
44 fn clone(&self) -> Self {
45 tracing::warn!(
46 "[SQL_ENGINE] SqlEngine::clone() called - will create new Engine with new session!"
47 );
48 Self {
50 engine: self.engine.clone(),
51 default_nulls_first: AtomicBool::new(
52 self.default_nulls_first.load(AtomicOrdering::Relaxed),
53 ),
54 }
55 }
56}
57
58#[allow(dead_code)]
59impl<P> SqlEngine<P>
60where
61 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
62{
63 fn map_table_error(table_name: &str, err: Error) -> Error {
64 match err {
65 Error::NotFound => Self::table_not_found_error(table_name),
66 Error::InvalidArgumentError(msg) if msg.contains("unknown table") => {
67 Self::table_not_found_error(table_name)
68 }
69 other => other,
70 }
71 }
72
73 fn table_not_found_error(table_name: &str) -> Error {
74 Error::CatalogError(format!(
75 "Catalog Error: Table '{table_name}' does not exist"
76 ))
77 }
78
79 fn is_table_missing_error(err: &Error) -> bool {
80 match err {
81 Error::NotFound => true,
82 Error::CatalogError(msg) => {
83 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
84 }
85 Error::InvalidArgumentError(msg) => {
86 msg.contains("Catalog Error: Table") || msg.contains("unknown table")
87 }
88 _ => false,
89 }
90 }
91
92 fn execute_plan_statement(
96 &self,
97 statement: PlanStatement,
98 ) -> SqlResult<RuntimeStatementResult<P>> {
99 let table = llkv_runtime::statement_table_name(&statement).map(str::to_string);
100 self.engine.execute_statement(statement).map_err(|err| {
101 if let Some(table_name) = table {
102 Self::map_table_error(&table_name, err)
103 } else {
104 err
105 }
106 })
107 }
108
109 pub fn new(pager: Arc<P>) -> Self {
110 let engine = RuntimeEngine::new(pager);
111 Self {
112 engine,
113 default_nulls_first: AtomicBool::new(false),
114 }
115 }
116
117 pub(crate) fn context_arc(&self) -> Arc<RuntimeContext<P>> {
118 self.engine.context()
119 }
120
121 pub fn with_context(context: Arc<RuntimeContext<P>>, default_nulls_first: bool) -> Self {
122 Self {
123 engine: RuntimeEngine::from_context(context),
124 default_nulls_first: AtomicBool::new(default_nulls_first),
125 }
126 }
127
128 #[cfg(test)]
129 fn default_nulls_first_for_tests(&self) -> bool {
130 self.default_nulls_first.load(AtomicOrdering::Relaxed)
131 }
132
133 fn has_active_transaction(&self) -> bool {
134 self.engine.session().has_active_transaction()
135 }
136
137 pub fn session(&self) -> &RuntimeSession<P> {
139 self.engine.session()
140 }
141
142 pub fn execute(&self, sql: &str) -> SqlResult<Vec<RuntimeStatementResult<P>>> {
143 tracing::trace!("DEBUG SQL execute: {}", sql);
144 let dialect = GenericDialect {};
145 let statements = Parser::parse_sql(&dialect, sql)
146 .map_err(|err| Error::InvalidArgumentError(format!("failed to parse SQL: {err}")))?;
147 tracing::trace!("DEBUG SQL execute: parsed {} statements", statements.len());
148
149 let mut results = Vec::with_capacity(statements.len());
150 for (i, statement) in statements.iter().enumerate() {
151 tracing::trace!("DEBUG SQL execute: processing statement {}", i);
152 results.push(self.execute_statement(statement.clone())?);
153 tracing::trace!("DEBUG SQL execute: statement {} completed", i);
154 }
155 tracing::trace!("DEBUG SQL execute completed successfully");
156 Ok(results)
157 }
158
159 fn execute_statement(&self, statement: Statement) -> SqlResult<RuntimeStatementResult<P>> {
160 tracing::trace!(
161 "DEBUG SQL execute_statement: {:?}",
162 match &statement {
163 Statement::Insert(insert) =>
164 format!("Insert(table={:?})", Self::table_name_from_insert(insert)),
165 Statement::Query(_) => "Query".to_string(),
166 Statement::StartTransaction { .. } => "StartTransaction".to_string(),
167 Statement::Commit { .. } => "Commit".to_string(),
168 Statement::Rollback { .. } => "Rollback".to_string(),
169 Statement::CreateTable(_) => "CreateTable".to_string(),
170 Statement::Update { .. } => "Update".to_string(),
171 Statement::Delete(_) => "Delete".to_string(),
172 other => format!("Other({:?})", other),
173 }
174 );
175 match statement {
176 Statement::StartTransaction {
177 modes,
178 begin,
179 transaction,
180 modifier,
181 statements,
182 exception,
183 has_end_keyword,
184 } => self.handle_start_transaction(
185 modes,
186 begin,
187 transaction,
188 modifier,
189 statements,
190 exception,
191 has_end_keyword,
192 ),
193 Statement::Commit {
194 chain,
195 end,
196 modifier,
197 } => self.handle_commit(chain, end, modifier),
198 Statement::Rollback { chain, savepoint } => self.handle_rollback(chain, savepoint),
199 other => self.execute_statement_non_transactional(other),
200 }
201 }
202
203 fn execute_statement_non_transactional(
204 &self,
205 statement: Statement,
206 ) -> SqlResult<RuntimeStatementResult<P>> {
207 tracing::trace!("DEBUG SQL execute_statement_non_transactional called");
208 match statement {
209 Statement::CreateTable(stmt) => {
210 tracing::trace!("DEBUG SQL execute_statement_non_transactional: CreateTable");
211 self.handle_create_table(stmt)
212 }
213 Statement::Insert(stmt) => {
214 let table_name =
215 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
216 tracing::trace!(
217 "DEBUG SQL execute_statement_non_transactional: Insert(table={})",
218 table_name
219 );
220 self.handle_insert(stmt)
221 }
222 Statement::Query(query) => {
223 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Query");
224 self.handle_query(*query)
225 }
226 Statement::Update {
227 table,
228 assignments,
229 from,
230 selection,
231 returning,
232 ..
233 } => {
234 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Update");
235 self.handle_update(table, assignments, from, selection, returning)
236 }
237 Statement::Delete(delete) => {
238 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Delete");
239 self.handle_delete(delete)
240 }
241 Statement::Drop {
242 object_type,
243 if_exists,
244 names,
245 cascade,
246 restrict,
247 purge,
248 temporary,
249 ..
250 } => {
251 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Drop");
252 self.handle_drop(
253 object_type,
254 if_exists,
255 names,
256 cascade,
257 restrict,
258 purge,
259 temporary,
260 )
261 }
262 Statement::Set(set_stmt) => {
263 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Set");
264 self.handle_set(set_stmt)
265 }
266 Statement::Pragma { name, value, is_eq } => {
267 tracing::trace!("DEBUG SQL execute_statement_non_transactional: Pragma");
268 self.handle_pragma(name, value, is_eq)
269 }
270 other => {
271 tracing::trace!(
272 "DEBUG SQL execute_statement_non_transactional: Other({:?})",
273 other
274 );
275 Err(Error::InvalidArgumentError(format!(
276 "unsupported SQL statement: {other:?}"
277 )))
278 }
279 }
280 }
281
282 fn table_name_from_insert(insert: &sqlparser::ast::Insert) -> SqlResult<String> {
283 match &insert.table {
284 TableObject::TableName(name) => Self::object_name_to_string(name),
285 _ => Err(Error::InvalidArgumentError(
286 "INSERT requires a plain table name".into(),
287 )),
288 }
289 }
290
291 fn table_name_from_update(table: &TableWithJoins) -> SqlResult<Option<String>> {
292 if !table.joins.is_empty() {
293 return Err(Error::InvalidArgumentError(
294 "UPDATE with JOIN targets is not supported yet".into(),
295 ));
296 }
297 Self::table_with_joins_name(table)
298 }
299
300 fn table_name_from_delete(delete: &Delete) -> SqlResult<Option<String>> {
301 if !delete.tables.is_empty() {
302 return Err(Error::InvalidArgumentError(
303 "multi-table DELETE is not supported yet".into(),
304 ));
305 }
306 let from_tables = match &delete.from {
307 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
308 };
309 if from_tables.is_empty() {
310 return Ok(None);
311 }
312 if from_tables.len() != 1 {
313 return Err(Error::InvalidArgumentError(
314 "DELETE over multiple tables is not supported yet".into(),
315 ));
316 }
317 Self::table_with_joins_name(&from_tables[0])
318 }
319
320 fn object_name_to_string(name: &ObjectName) -> SqlResult<String> {
321 let (display, _) = canonical_object_name(name)?;
322 Ok(display)
323 }
324
325 #[allow(dead_code)]
326 fn table_object_to_name(table: &TableObject) -> SqlResult<Option<String>> {
327 match table {
328 TableObject::TableName(name) => Ok(Some(Self::object_name_to_string(name)?)),
329 TableObject::TableFunction(_) => Ok(None),
330 }
331 }
332
333 fn table_with_joins_name(table: &TableWithJoins) -> SqlResult<Option<String>> {
334 match &table.relation {
335 TableFactor::Table { name, .. } => Ok(Some(Self::object_name_to_string(name)?)),
336 _ => Ok(None),
337 }
338 }
339
340 fn tables_in_query(query: &Query) -> SqlResult<Vec<String>> {
341 let mut tables = Vec::new();
342 if let sqlparser::ast::SetExpr::Select(select) = query.body.as_ref() {
343 for table in &select.from {
344 if let TableFactor::Table { name, .. } = &table.relation {
345 tables.push(Self::object_name_to_string(name)?);
346 }
347 }
348 }
349 Ok(tables)
350 }
351
352 fn is_table_marked_dropped(&self, table_name: &str) -> SqlResult<bool> {
353 let canonical = table_name.to_ascii_lowercase();
354 Ok(self.engine.context().is_table_marked_dropped(&canonical))
355 }
356
357 fn handle_create_table(
358 &self,
359 mut stmt: sqlparser::ast::CreateTable,
360 ) -> SqlResult<RuntimeStatementResult<P>> {
361 validate_create_table_common(&stmt)?;
362
363 let (display_name, canonical_name) = canonical_object_name(&stmt.name)?;
364 tracing::trace!(
365 "\n=== HANDLE_CREATE_TABLE: table='{}' columns={} ===",
366 display_name,
367 stmt.columns.len()
368 );
369 if display_name.is_empty() {
370 return Err(Error::InvalidArgumentError(
371 "table name must not be empty".into(),
372 ));
373 }
374
375 if let Some(query) = stmt.query.take() {
376 validate_create_table_as(&stmt)?;
377 if let Some(result) = self.try_handle_range_ctas(
378 &display_name,
379 &canonical_name,
380 &query,
381 stmt.if_not_exists,
382 stmt.or_replace,
383 )? {
384 return Ok(result);
385 }
386 return self.handle_create_table_as(
387 display_name,
388 canonical_name,
389 *query,
390 stmt.if_not_exists,
391 stmt.or_replace,
392 );
393 }
394
395 if stmt.columns.is_empty() {
396 return Err(Error::InvalidArgumentError(
397 "CREATE TABLE requires at least one column".into(),
398 ));
399 }
400
401 validate_create_table_definition(&stmt)?;
402
403 let mut columns: Vec<ColumnSpec> = Vec::with_capacity(stmt.columns.len());
404 let mut names: HashMap<String, ()> = HashMap::new();
405 for column_def in stmt.columns {
406 let is_nullable = column_def
407 .options
408 .iter()
409 .all(|opt| !matches!(opt.option, ColumnOption::NotNull));
410
411 let is_primary_key = column_def.options.iter().any(|opt| {
412 matches!(
413 opt.option,
414 ColumnOption::Unique {
415 is_primary: true,
416 characteristics: _
417 }
418 )
419 });
420
421 tracing::trace!(
422 "DEBUG CREATE TABLE column '{}' is_primary_key={}",
423 column_def.name.value,
424 is_primary_key
425 );
426
427 let mut column = ColumnSpec::new(
428 column_def.name.value.clone(),
429 arrow_type_from_sql(&column_def.data_type)?,
430 is_nullable,
431 );
432 tracing::trace!(
433 "DEBUG ColumnSpec after new(): primary_key={}",
434 column.primary_key
435 );
436
437 column = column.with_primary_key(is_primary_key);
438 tracing::trace!(
439 "DEBUG ColumnSpec after with_primary_key({}): primary_key={}",
440 is_primary_key,
441 column.primary_key
442 );
443
444 let normalized = column.name.to_ascii_lowercase();
445 if names.insert(normalized, ()).is_some() {
446 return Err(Error::InvalidArgumentError(format!(
447 "duplicate column name '{}' in table '{}'",
448 column.name, display_name
449 )));
450 }
451 columns.push(column);
452 }
453
454 let plan = CreateTablePlan {
455 name: display_name,
456 if_not_exists: stmt.if_not_exists,
457 or_replace: stmt.or_replace,
458 columns,
459 source: None,
460 };
461 self.execute_plan_statement(PlanStatement::CreateTable(plan))
462 }
463
464 fn try_handle_range_ctas(
465 &self,
466 display_name: &str,
467 _canonical_name: &str,
468 query: &Query,
469 if_not_exists: bool,
470 or_replace: bool,
471 ) -> SqlResult<Option<RuntimeStatementResult<P>>> {
472 let select = match query.body.as_ref() {
473 SetExpr::Select(select) => select,
474 _ => return Ok(None),
475 };
476 if select.from.len() != 1 {
477 return Ok(None);
478 }
479 let table_with_joins = &select.from[0];
480 if !table_with_joins.joins.is_empty() {
481 return Ok(None);
482 }
483 let (range_size, range_alias) = match &table_with_joins.relation {
484 TableFactor::Table {
485 name,
486 args: Some(args),
487 alias,
488 ..
489 } => {
490 let func_name = name.to_string().to_ascii_lowercase();
491 if func_name != "range" {
492 return Ok(None);
493 }
494 if args.args.len() != 1 {
495 return Err(Error::InvalidArgumentError(
496 "range table function expects a single argument".into(),
497 ));
498 }
499 let size_expr = &args.args[0];
500 let range_size = match size_expr {
501 FunctionArg::Unnamed(FunctionArgExpr::Expr(SqlExpr::Value(value))) => {
502 match &value.value {
503 Value::Number(raw, _) => raw.parse::<i64>().map_err(|e| {
504 Error::InvalidArgumentError(format!(
505 "invalid range size literal {}: {}",
506 raw, e
507 ))
508 })?,
509 other => {
510 return Err(Error::InvalidArgumentError(format!(
511 "unsupported range size value: {:?}",
512 other
513 )));
514 }
515 }
516 }
517 _ => {
518 return Err(Error::InvalidArgumentError(
519 "unsupported range argument".into(),
520 ));
521 }
522 };
523 (range_size, alias.as_ref().map(|a| a.name.value.clone()))
524 }
525 _ => return Ok(None),
526 };
527
528 if range_size < 0 {
529 return Err(Error::InvalidArgumentError(
530 "range size must be non-negative".into(),
531 ));
532 }
533
534 if select.projection.is_empty() {
535 return Err(Error::InvalidArgumentError(
536 "CREATE TABLE AS SELECT requires at least one projected column".into(),
537 ));
538 }
539
540 let mut column_specs = Vec::with_capacity(select.projection.len());
541 let mut column_names = Vec::with_capacity(select.projection.len());
542 let mut row_template = Vec::with_capacity(select.projection.len());
543 for item in &select.projection {
544 match item {
545 SelectItem::ExprWithAlias { expr, alias } => {
546 let (value, data_type) = match expr {
547 SqlExpr::Value(value_with_span) => match &value_with_span.value {
548 Value::Number(raw, _) => {
549 let parsed = raw.parse::<i64>().map_err(|e| {
550 Error::InvalidArgumentError(format!(
551 "invalid numeric literal {}: {}",
552 raw, e
553 ))
554 })?;
555 (
556 PlanValue::Integer(parsed),
557 arrow::datatypes::DataType::Int64,
558 )
559 }
560 Value::SingleQuotedString(s) => (
561 PlanValue::String(s.clone()),
562 arrow::datatypes::DataType::Utf8,
563 ),
564 other => {
565 return Err(Error::InvalidArgumentError(format!(
566 "unsupported SELECT expression in range CTAS: {:?}",
567 other
568 )));
569 }
570 },
571 SqlExpr::Identifier(ident) => {
572 let ident_lower = ident.value.to_ascii_lowercase();
573 if range_alias
574 .as_ref()
575 .map(|a| a.eq_ignore_ascii_case(&ident_lower))
576 .unwrap_or(false)
577 || ident_lower == "range"
578 {
579 return Err(Error::InvalidArgumentError(
580 "range() table function columns are not supported yet".into(),
581 ));
582 }
583 return Err(Error::InvalidArgumentError(format!(
584 "unsupported identifier '{}' in range CTAS projection",
585 ident.value
586 )));
587 }
588 other => {
589 return Err(Error::InvalidArgumentError(format!(
590 "unsupported SELECT expression in range CTAS: {:?}",
591 other
592 )));
593 }
594 };
595 let column_name = alias.value.clone();
596 column_specs.push(ColumnSpec::new(column_name.clone(), data_type, true));
597 column_names.push(column_name);
598 row_template.push(value);
599 }
600 other => {
601 return Err(Error::InvalidArgumentError(format!(
602 "unsupported projection {:?} in range CTAS",
603 other
604 )));
605 }
606 }
607 }
608
609 let plan = CreateTablePlan {
610 name: display_name.to_string(),
611 if_not_exists,
612 or_replace,
613 columns: column_specs,
614 source: None,
615 };
616 let create_result = self.execute_plan_statement(PlanStatement::CreateTable(plan))?;
617
618 let row_count = range_size
619 .try_into()
620 .map_err(|_| Error::InvalidArgumentError("range size exceeds usize".into()))?;
621 if row_count > 0 {
622 let rows = vec![row_template; row_count];
623 let insert_plan = InsertPlan {
624 table: display_name.to_string(),
625 columns: column_names,
626 source: InsertSource::Rows(rows),
627 };
628 self.execute_plan_statement(PlanStatement::Insert(insert_plan))?;
629 }
630
631 Ok(Some(create_result))
632 }
633
634 fn handle_create_table_as(
635 &self,
636 display_name: String,
637 _canonical_name: String,
638 query: Query,
639 if_not_exists: bool,
640 or_replace: bool,
641 ) -> SqlResult<RuntimeStatementResult<P>> {
642 let select_plan = self.build_select_plan(query)?;
643
644 if select_plan.projections.is_empty() && select_plan.aggregates.is_empty() {
645 return Err(Error::InvalidArgumentError(
646 "CREATE TABLE AS SELECT requires at least one projected column".into(),
647 ));
648 }
649
650 let plan = CreateTablePlan {
651 name: display_name,
652 if_not_exists,
653 or_replace,
654 columns: Vec::new(),
655 source: Some(CreateTableSource::Select {
656 plan: Box::new(select_plan),
657 }),
658 };
659 self.execute_plan_statement(PlanStatement::CreateTable(plan))
660 }
661
662 fn handle_insert(&self, stmt: sqlparser::ast::Insert) -> SqlResult<RuntimeStatementResult<P>> {
663 let table_name_debug =
664 Self::table_name_from_insert(&stmt).unwrap_or_else(|_| "unknown".to_string());
665 tracing::trace!(
666 "DEBUG SQL handle_insert called for table={}",
667 table_name_debug
668 );
669 if !self.engine.session().has_active_transaction()
670 && self.is_table_marked_dropped(&table_name_debug)?
671 {
672 return Err(Error::TransactionContextError(
673 DROPPED_TABLE_TRANSACTION_ERR.into(),
674 ));
675 }
676 if stmt.replace_into || stmt.ignore || stmt.or.is_some() {
677 return Err(Error::InvalidArgumentError(
678 "non-standard INSERT forms are not supported".into(),
679 ));
680 }
681 if stmt.overwrite {
682 return Err(Error::InvalidArgumentError(
683 "INSERT OVERWRITE is not supported".into(),
684 ));
685 }
686 if !stmt.assignments.is_empty() {
687 return Err(Error::InvalidArgumentError(
688 "INSERT ... SET is not supported".into(),
689 ));
690 }
691 if stmt.partitioned.is_some() || !stmt.after_columns.is_empty() {
692 return Err(Error::InvalidArgumentError(
693 "partitioned INSERT is not supported".into(),
694 ));
695 }
696 if stmt.returning.is_some() {
697 return Err(Error::InvalidArgumentError(
698 "INSERT ... RETURNING is not supported".into(),
699 ));
700 }
701 if stmt.format_clause.is_some() || stmt.settings.is_some() {
702 return Err(Error::InvalidArgumentError(
703 "INSERT with FORMAT or SETTINGS is not supported".into(),
704 ));
705 }
706
707 let (display_name, _canonical_name) = match &stmt.table {
708 TableObject::TableName(name) => canonical_object_name(name)?,
709 _ => {
710 return Err(Error::InvalidArgumentError(
711 "INSERT requires a plain table name".into(),
712 ));
713 }
714 };
715
716 let columns: Vec<String> = stmt
717 .columns
718 .iter()
719 .map(|ident| ident.value.clone())
720 .collect();
721 let source_expr = stmt
722 .source
723 .as_ref()
724 .ok_or_else(|| Error::InvalidArgumentError("INSERT requires a VALUES clause".into()))?;
725 validate_simple_query(source_expr)?;
726
727 let insert_source = match source_expr.body.as_ref() {
728 SetExpr::Values(values) => {
729 if values.rows.is_empty() {
730 return Err(Error::InvalidArgumentError(
731 "INSERT VALUES list must contain at least one row".into(),
732 ));
733 }
734 let mut rows: Vec<Vec<SqlValue>> = Vec::with_capacity(values.rows.len());
735 for row in &values.rows {
736 let mut converted = Vec::with_capacity(row.len());
737 for expr in row {
738 converted.push(SqlValue::try_from_expr(expr)?);
739 }
740 rows.push(converted);
741 }
742 InsertSource::Rows(
743 rows.into_iter()
744 .map(|row| row.into_iter().map(PlanValue::from).collect())
745 .collect(),
746 )
747 }
748 SetExpr::Select(select) => {
749 if let Some(rows) = extract_constant_select_rows(select.as_ref())? {
750 InsertSource::Rows(rows)
751 } else if let Some(range_rows) = extract_rows_from_range(select.as_ref())? {
752 InsertSource::Rows(range_rows.into_rows())
753 } else {
754 let select_plan = self.build_select_plan((**source_expr).clone())?;
755 InsertSource::Select {
756 plan: Box::new(select_plan),
757 }
758 }
759 }
760 _ => {
761 return Err(Error::InvalidArgumentError(
762 "unsupported INSERT source".into(),
763 ));
764 }
765 };
766
767 let plan = InsertPlan {
768 table: display_name.clone(),
769 columns,
770 source: insert_source,
771 };
772 tracing::trace!(
773 "DEBUG SQL handle_insert: about to execute insert for table={}",
774 display_name
775 );
776 self.execute_plan_statement(PlanStatement::Insert(plan))
777 }
778
779 fn handle_update(
780 &self,
781 table: TableWithJoins,
782 assignments: Vec<Assignment>,
783 from: Option<UpdateTableFromKind>,
784 selection: Option<SqlExpr>,
785 returning: Option<Vec<SelectItem>>,
786 ) -> SqlResult<RuntimeStatementResult<P>> {
787 if from.is_some() {
788 return Err(Error::InvalidArgumentError(
789 "UPDATE ... FROM is not supported yet".into(),
790 ));
791 }
792 if returning.is_some() {
793 return Err(Error::InvalidArgumentError(
794 "UPDATE ... RETURNING is not supported".into(),
795 ));
796 }
797 if assignments.is_empty() {
798 return Err(Error::InvalidArgumentError(
799 "UPDATE requires at least one assignment".into(),
800 ));
801 }
802
803 let (display_name, canonical_name) = extract_single_table(std::slice::from_ref(&table))?;
804
805 if !self.engine.session().has_active_transaction()
806 && self
807 .engine
808 .context()
809 .is_table_marked_dropped(&canonical_name)
810 {
811 return Err(Error::TransactionContextError(
812 DROPPED_TABLE_TRANSACTION_ERR.into(),
813 ));
814 }
815
816 let mut column_assignments = Vec::with_capacity(assignments.len());
817 let mut seen: HashMap<String, ()> = HashMap::new();
818 for assignment in assignments {
819 let column_name = resolve_assignment_column_name(&assignment.target)?;
820 let normalized = column_name.to_ascii_lowercase();
821 if seen.insert(normalized, ()).is_some() {
822 return Err(Error::InvalidArgumentError(format!(
823 "duplicate column '{}' in UPDATE assignments",
824 column_name
825 )));
826 }
827 let value = match SqlValue::try_from_expr(&assignment.value) {
828 Ok(literal) => AssignmentValue::Literal(PlanValue::from(literal)),
829 Err(Error::InvalidArgumentError(msg))
830 if msg.contains("unsupported literal expression") =>
831 {
832 let translated = translate_scalar(&assignment.value)?;
833 AssignmentValue::Expression(translated)
834 }
835 Err(err) => return Err(err),
836 };
837 column_assignments.push(ColumnAssignment {
838 column: column_name,
839 value,
840 });
841 }
842
843 let filter = match selection {
844 Some(expr) => Some(translate_condition(&expr)?),
845 None => None,
846 };
847
848 let plan = UpdatePlan {
849 table: display_name.clone(),
850 assignments: column_assignments,
851 filter,
852 };
853 self.execute_plan_statement(PlanStatement::Update(plan))
854 }
855
856 #[allow(clippy::collapsible_if)]
857 fn handle_delete(&self, delete: Delete) -> SqlResult<RuntimeStatementResult<P>> {
858 let Delete {
859 tables,
860 from,
861 using,
862 selection,
863 returning,
864 order_by,
865 limit,
866 } = delete;
867
868 if !tables.is_empty() {
869 return Err(Error::InvalidArgumentError(
870 "multi-table DELETE is not supported yet".into(),
871 ));
872 }
873 if let Some(using_tables) = using {
874 if !using_tables.is_empty() {
875 return Err(Error::InvalidArgumentError(
876 "DELETE ... USING is not supported yet".into(),
877 ));
878 }
879 }
880 if returning.is_some() {
881 return Err(Error::InvalidArgumentError(
882 "DELETE ... RETURNING is not supported".into(),
883 ));
884 }
885 if !order_by.is_empty() {
886 return Err(Error::InvalidArgumentError(
887 "DELETE ... ORDER BY is not supported yet".into(),
888 ));
889 }
890 if limit.is_some() {
891 return Err(Error::InvalidArgumentError(
892 "DELETE ... LIMIT is not supported yet".into(),
893 ));
894 }
895
896 let from_tables = match from {
897 FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => tables,
898 };
899 let (display_name, canonical_name) = extract_single_table(&from_tables)?;
900
901 if !self.engine.session().has_active_transaction()
902 && self
903 .engine
904 .context()
905 .is_table_marked_dropped(&canonical_name)
906 {
907 return Err(Error::TransactionContextError(
908 DROPPED_TABLE_TRANSACTION_ERR.into(),
909 ));
910 }
911
912 let filter = selection
913 .map(|expr| translate_condition(&expr))
914 .transpose()?;
915
916 let plan = DeletePlan {
917 table: display_name.clone(),
918 filter,
919 };
920 self.execute_plan_statement(PlanStatement::Delete(plan))
921 }
922
923 #[allow(clippy::too_many_arguments)] fn handle_drop(
925 &self,
926 object_type: ObjectType,
927 if_exists: bool,
928 names: Vec<ObjectName>,
929 cascade: bool,
930 restrict: bool,
931 purge: bool,
932 temporary: bool,
933 ) -> SqlResult<RuntimeStatementResult<P>> {
934 if cascade || restrict || purge || temporary {
935 return Err(Error::InvalidArgumentError(
936 "DROP TABLE cascade/restrict/purge/temporary options are not supported".into(),
937 ));
938 }
939
940 if object_type != ObjectType::Table {
941 return Err(Error::InvalidArgumentError(
942 "only DROP TABLE is supported".into(),
943 ));
944 }
945
946 let ctx = self.engine.context();
947 for name in names {
948 let table_name = Self::object_name_to_string(&name)?;
949 ctx.drop_table_immediate(&table_name, if_exists)
950 .map_err(|err| Self::map_table_error(&table_name, err))?;
951 }
952
953 Ok(RuntimeStatementResult::NoOp)
954 }
955
956 fn handle_query(&self, query: Query) -> SqlResult<RuntimeStatementResult<P>> {
957 let select_plan = self.build_select_plan(query)?;
958 self.execute_plan_statement(PlanStatement::Select(select_plan))
959 }
960
961 fn build_select_plan(&self, query: Query) -> SqlResult<SelectPlan> {
962 if self.engine.session().has_active_transaction() && self.engine.session().is_aborted() {
963 return Err(Error::TransactionContextError(
964 "TransactionContext Error: transaction is aborted".into(),
965 ));
966 }
967
968 validate_simple_query(&query)?;
969 let mut select_plan = match query.body.as_ref() {
970 SetExpr::Select(select) => self.translate_select(select.as_ref())?,
971 other => {
972 return Err(Error::InvalidArgumentError(format!(
973 "unsupported query expression: {other:?}"
974 )));
975 }
976 };
977 if let Some(order_by) = &query.order_by {
978 if !select_plan.aggregates.is_empty() {
979 return Err(Error::InvalidArgumentError(
980 "ORDER BY is not supported for aggregate queries".into(),
981 ));
982 }
983 let order_plan = self.translate_order_by(order_by)?;
984 select_plan = select_plan.with_order_by(Some(order_plan));
985 }
986 Ok(select_plan)
987 }
988
989 fn translate_select(&self, select: &Select) -> SqlResult<SelectPlan> {
990 if select.distinct.is_some() {
991 return Err(Error::InvalidArgumentError(
992 "SELECT DISTINCT is not supported".into(),
993 ));
994 }
995 if select.top.is_some() {
996 return Err(Error::InvalidArgumentError(
997 "SELECT TOP is not supported".into(),
998 ));
999 }
1000 if select.exclude.is_some() {
1001 return Err(Error::InvalidArgumentError(
1002 "SELECT EXCLUDE is not supported".into(),
1003 ));
1004 }
1005 if select.into.is_some() {
1006 return Err(Error::InvalidArgumentError(
1007 "SELECT INTO is not supported".into(),
1008 ));
1009 }
1010 if !select.lateral_views.is_empty() {
1011 return Err(Error::InvalidArgumentError(
1012 "LATERAL VIEW is not supported".into(),
1013 ));
1014 }
1015 if select.prewhere.is_some() {
1016 return Err(Error::InvalidArgumentError(
1017 "PREWHERE is not supported".into(),
1018 ));
1019 }
1020 if !group_by_is_empty(&select.group_by) || select.value_table_mode.is_some() {
1021 return Err(Error::InvalidArgumentError(
1022 "GROUP BY and SELECT AS VALUE/STRUCT are not supported".into(),
1023 ));
1024 }
1025 if !select.cluster_by.is_empty()
1026 || !select.distribute_by.is_empty()
1027 || !select.sort_by.is_empty()
1028 {
1029 return Err(Error::InvalidArgumentError(
1030 "CLUSTER/DISTRIBUTE/SORT BY clauses are not supported".into(),
1031 ));
1032 }
1033 if select.having.is_some()
1034 || !select.named_window.is_empty()
1035 || select.qualify.is_some()
1036 || select.connect_by.is_some()
1037 {
1038 return Err(Error::InvalidArgumentError(
1039 "advanced SELECT clauses are not supported".into(),
1040 ));
1041 }
1042
1043 let (display_name, _canonical_name) = extract_single_table(&select.from)?;
1044 let mut plan = SelectPlan::new(display_name);
1045
1046 if let Some(aggregates) = self.detect_simple_aggregates(&select.projection)? {
1047 plan = plan.with_aggregates(aggregates);
1048 } else {
1049 let projections = self.build_projection_list(&select.projection)?;
1050 plan = plan.with_projections(projections);
1051 }
1052
1053 let filter_expr = match &select.selection {
1054 Some(expr) => Some(translate_condition(expr)?),
1055 None => None,
1056 };
1057 plan = plan.with_filter(filter_expr);
1058 Ok(plan)
1059 }
1060
1061 fn translate_order_by(&self, order_by: &OrderBy) -> SqlResult<OrderByPlan> {
1062 let exprs = match &order_by.kind {
1063 OrderByKind::Expressions(exprs) => exprs,
1064 _ => {
1065 return Err(Error::InvalidArgumentError(
1066 "unsupported ORDER BY clause".into(),
1067 ));
1068 }
1069 };
1070
1071 if exprs.len() != 1 {
1072 return Err(Error::InvalidArgumentError(
1073 "ORDER BY currently supports a single expression".into(),
1074 ));
1075 }
1076
1077 let order_expr: &OrderByExpr = &exprs[0];
1078 let ascending = order_expr.options.asc.unwrap_or(true);
1079 let base_nulls_first = self.default_nulls_first.load(AtomicOrdering::Relaxed);
1080 let default_nulls_first_for_direction = if ascending {
1081 base_nulls_first
1082 } else {
1083 !base_nulls_first
1084 };
1085 let nulls_first = order_expr
1086 .options
1087 .nulls_first
1088 .unwrap_or(default_nulls_first_for_direction);
1089
1090 let (target, sort_type) = match &order_expr.expr {
1091 SqlExpr::Identifier(_) | SqlExpr::CompoundIdentifier(_) => (
1092 OrderTarget::Column(resolve_column_name(&order_expr.expr)?),
1093 OrderSortType::Native,
1094 ),
1095 SqlExpr::Cast {
1096 expr,
1097 data_type:
1098 SqlDataType::Int(_)
1099 | SqlDataType::Integer(_)
1100 | SqlDataType::BigInt(_)
1101 | SqlDataType::SmallInt(_)
1102 | SqlDataType::TinyInt(_),
1103 ..
1104 } => (
1105 OrderTarget::Column(resolve_column_name(expr)?),
1106 OrderSortType::CastTextToInteger,
1107 ),
1108 SqlExpr::Cast { data_type, .. } => {
1109 return Err(Error::InvalidArgumentError(format!(
1110 "ORDER BY CAST target type {:?} is not supported",
1111 data_type
1112 )));
1113 }
1114 SqlExpr::Value(value_with_span) => match &value_with_span.value {
1115 Value::Number(raw, _) => {
1116 let position: usize = raw.parse().map_err(|_| {
1117 Error::InvalidArgumentError(format!(
1118 "ORDER BY position '{}' is not a valid positive integer",
1119 raw
1120 ))
1121 })?;
1122 if position == 0 {
1123 return Err(Error::InvalidArgumentError(
1124 "ORDER BY position must be at least 1".into(),
1125 ));
1126 }
1127 (OrderTarget::Index(position - 1), OrderSortType::Native)
1128 }
1129 other => {
1130 return Err(Error::InvalidArgumentError(format!(
1131 "unsupported ORDER BY literal expression: {other:?}"
1132 )));
1133 }
1134 },
1135 other => {
1136 return Err(Error::InvalidArgumentError(format!(
1137 "unsupported ORDER BY expression: {other:?}"
1138 )));
1139 }
1140 };
1141
1142 Ok(OrderByPlan {
1143 target,
1144 sort_type,
1145 ascending,
1146 nulls_first,
1147 })
1148 }
1149
1150 fn detect_simple_aggregates(
1151 &self,
1152 projection_items: &[SelectItem],
1153 ) -> SqlResult<Option<Vec<AggregateExpr>>> {
1154 if projection_items.is_empty() {
1155 return Ok(None);
1156 }
1157
1158 let mut specs: Vec<AggregateExpr> = Vec::with_capacity(projection_items.len());
1159 for (idx, item) in projection_items.iter().enumerate() {
1160 let (expr, alias_opt) = match item {
1161 SelectItem::UnnamedExpr(expr) => (expr, None),
1162 SelectItem::ExprWithAlias { expr, alias } => (expr, Some(alias.value.clone())),
1163 _ => return Ok(None),
1164 };
1165
1166 let alias = alias_opt.unwrap_or_else(|| format!("col{}", idx + 1));
1167 let SqlExpr::Function(func) = expr else {
1168 return Ok(None);
1169 };
1170
1171 if func.uses_odbc_syntax {
1172 return Err(Error::InvalidArgumentError(
1173 "ODBC function syntax is not supported in aggregate queries".into(),
1174 ));
1175 }
1176 if !matches!(func.parameters, FunctionArguments::None) {
1177 return Err(Error::InvalidArgumentError(
1178 "parameterized aggregate functions are not supported".into(),
1179 ));
1180 }
1181 if func.filter.is_some()
1182 || func.null_treatment.is_some()
1183 || func.over.is_some()
1184 || !func.within_group.is_empty()
1185 {
1186 return Err(Error::InvalidArgumentError(
1187 "advanced aggregate clauses are not supported".into(),
1188 ));
1189 }
1190
1191 let args_slice: &[FunctionArg] = match &func.args {
1192 FunctionArguments::List(list) => {
1193 if list.duplicate_treatment.is_some() {
1194 return Err(Error::InvalidArgumentError(
1195 "DISTINCT aggregates are not supported".into(),
1196 ));
1197 }
1198 if !list.clauses.is_empty() {
1199 return Err(Error::InvalidArgumentError(
1200 "aggregate argument clauses are not supported".into(),
1201 ));
1202 }
1203 &list.args
1204 }
1205 FunctionArguments::None => &[],
1206 FunctionArguments::Subquery(_) => {
1207 return Err(Error::InvalidArgumentError(
1208 "aggregate subquery arguments are not supported".into(),
1209 ));
1210 }
1211 };
1212
1213 let func_name = if func.name.0.len() == 1 {
1214 match &func.name.0[0] {
1215 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
1216 _ => {
1217 return Err(Error::InvalidArgumentError(
1218 "unsupported aggregate function name".into(),
1219 ));
1220 }
1221 }
1222 } else {
1223 return Err(Error::InvalidArgumentError(
1224 "qualified aggregate function names are not supported".into(),
1225 ));
1226 };
1227
1228 let aggregate = match func_name.as_str() {
1229 "count" => {
1230 if args_slice.len() != 1 {
1231 return Err(Error::InvalidArgumentError(
1232 "COUNT accepts exactly one argument".into(),
1233 ));
1234 }
1235 match &args_slice[0] {
1236 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
1237 AggregateExpr::count_star(alias)
1238 }
1239 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
1240 let column = resolve_column_name(arg_expr)?;
1241 AggregateExpr::count_column(column, alias)
1242 }
1243 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
1244 return Err(Error::InvalidArgumentError(
1245 "named COUNT arguments are not supported".into(),
1246 ));
1247 }
1248 FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
1249 return Err(Error::InvalidArgumentError(
1250 "COUNT does not support qualified wildcards".into(),
1251 ));
1252 }
1253 }
1254 }
1255 "sum" | "min" | "max" => {
1256 if args_slice.len() != 1 {
1257 return Err(Error::InvalidArgumentError(format!(
1258 "{} accepts exactly one argument",
1259 func_name.to_uppercase()
1260 )));
1261 }
1262 let arg_expr = match &args_slice[0] {
1263 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => arg_expr,
1264 FunctionArg::Unnamed(FunctionArgExpr::Wildcard)
1265 | FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
1266 return Err(Error::InvalidArgumentError(format!(
1267 "{} does not support wildcard arguments",
1268 func_name.to_uppercase()
1269 )));
1270 }
1271 FunctionArg::Named { .. } | FunctionArg::ExprNamed { .. } => {
1272 return Err(Error::InvalidArgumentError(format!(
1273 "{} arguments must be column references",
1274 func_name.to_uppercase()
1275 )));
1276 }
1277 };
1278
1279 if func_name == "sum" {
1280 if let Some(column) = parse_count_nulls_case(arg_expr)? {
1281 AggregateExpr::count_nulls(column, alias)
1282 } else {
1283 let column = resolve_column_name(arg_expr)?;
1284 AggregateExpr::sum_int64(column, alias)
1285 }
1286 } else {
1287 let column = resolve_column_name(arg_expr)?;
1288 if func_name == "min" {
1289 AggregateExpr::min_int64(column, alias)
1290 } else {
1291 AggregateExpr::max_int64(column, alias)
1292 }
1293 }
1294 }
1295 _ => return Ok(None),
1296 };
1297
1298 specs.push(aggregate);
1299 }
1300
1301 if specs.is_empty() {
1302 return Ok(None);
1303 }
1304 Ok(Some(specs))
1305 }
1306
1307 fn build_projection_list(
1308 &self,
1309 projection_items: &[SelectItem],
1310 ) -> SqlResult<Vec<SelectProjection>> {
1311 if projection_items.is_empty() {
1312 return Err(Error::InvalidArgumentError(
1313 "SELECT projection must include at least one column".into(),
1314 ));
1315 }
1316 let mut projections = Vec::with_capacity(projection_items.len());
1317 for (idx, item) in projection_items.iter().enumerate() {
1318 match item {
1319 SelectItem::Wildcard(_) => {
1320 projections.push(SelectProjection::AllColumns);
1321 }
1322 SelectItem::QualifiedWildcard(kind, _) => match kind {
1323 SelectItemQualifiedWildcardKind::ObjectName(name) => {
1324 projections.push(SelectProjection::Column {
1325 name: name.to_string(),
1326 alias: None,
1327 });
1328 }
1329 SelectItemQualifiedWildcardKind::Expr(_) => {
1330 return Err(Error::InvalidArgumentError(
1331 "expression-qualified wildcards are not supported".into(),
1332 ));
1333 }
1334 },
1335 SelectItem::UnnamedExpr(expr) => {
1336 let scalar = translate_scalar(expr)?;
1337 let alias = format!("col{}", idx + 1);
1338 projections.push(SelectProjection::Computed {
1339 expr: scalar,
1340 alias,
1341 });
1342 }
1343 SelectItem::ExprWithAlias { expr, alias } => {
1344 let scalar = translate_scalar(expr)?;
1345 projections.push(SelectProjection::Computed {
1346 expr: scalar,
1347 alias: alias.value.clone(),
1348 });
1349 }
1350 }
1351 }
1352 Ok(projections)
1353 }
1354
1355 #[allow(clippy::too_many_arguments)] fn handle_start_transaction(
1357 &self,
1358 modes: Vec<TransactionMode>,
1359 begin: bool,
1360 transaction: Option<BeginTransactionKind>,
1361 modifier: Option<TransactionModifier>,
1362 statements: Vec<Statement>,
1363 exception: Option<Vec<ExceptionWhen>>,
1364 has_end_keyword: bool,
1365 ) -> SqlResult<RuntimeStatementResult<P>> {
1366 if !modes.is_empty() {
1367 return Err(Error::InvalidArgumentError(
1368 "transaction modes are not supported".into(),
1369 ));
1370 }
1371 if modifier.is_some() {
1372 return Err(Error::InvalidArgumentError(
1373 "transaction modifiers are not supported".into(),
1374 ));
1375 }
1376 if !statements.is_empty() || exception.is_some() || has_end_keyword {
1377 return Err(Error::InvalidArgumentError(
1378 "BEGIN blocks with inline statements or exceptions are not supported".into(),
1379 ));
1380 }
1381 if let Some(kind) = transaction {
1382 match kind {
1383 BeginTransactionKind::Transaction | BeginTransactionKind::Work => {}
1384 }
1385 }
1386 if !begin {
1387 tracing::warn!("Currently treat `START TRANSACTION` same as `BEGIN`")
1389 }
1390
1391 self.execute_plan_statement(PlanStatement::BeginTransaction)
1392 }
1393
1394 fn handle_commit(
1395 &self,
1396 chain: bool,
1397 end: bool,
1398 modifier: Option<TransactionModifier>,
1399 ) -> SqlResult<RuntimeStatementResult<P>> {
1400 if chain {
1401 return Err(Error::InvalidArgumentError(
1402 "COMMIT AND [NO] CHAIN is not supported".into(),
1403 ));
1404 }
1405 if end {
1406 return Err(Error::InvalidArgumentError(
1407 "END blocks are not supported".into(),
1408 ));
1409 }
1410 if modifier.is_some() {
1411 return Err(Error::InvalidArgumentError(
1412 "transaction modifiers are not supported".into(),
1413 ));
1414 }
1415
1416 self.execute_plan_statement(PlanStatement::CommitTransaction)
1417 }
1418
1419 fn handle_rollback(
1420 &self,
1421 chain: bool,
1422 savepoint: Option<Ident>,
1423 ) -> SqlResult<RuntimeStatementResult<P>> {
1424 if chain {
1425 return Err(Error::InvalidArgumentError(
1426 "ROLLBACK AND [NO] CHAIN is not supported".into(),
1427 ));
1428 }
1429 if savepoint.is_some() {
1430 return Err(Error::InvalidArgumentError(
1431 "ROLLBACK TO SAVEPOINT is not supported".into(),
1432 ));
1433 }
1434
1435 self.execute_plan_statement(PlanStatement::RollbackTransaction)
1436 }
1437
1438 fn handle_set(&self, set_stmt: Set) -> SqlResult<RuntimeStatementResult<P>> {
1439 match set_stmt {
1440 Set::SingleAssignment {
1441 scope,
1442 hivevar,
1443 variable,
1444 values,
1445 } => {
1446 if scope.is_some() || hivevar {
1447 return Err(Error::InvalidArgumentError(
1448 "SET modifiers are not supported".into(),
1449 ));
1450 }
1451
1452 let variable_name_raw = variable.to_string();
1453 let variable_name = variable_name_raw.to_ascii_lowercase();
1454
1455 match variable_name.as_str() {
1456 "default_null_order" => {
1457 if values.len() != 1 {
1458 return Err(Error::InvalidArgumentError(
1459 "SET default_null_order expects exactly one value".into(),
1460 ));
1461 }
1462
1463 let value_expr = &values[0];
1464 let normalized = match value_expr {
1465 SqlExpr::Value(value_with_span) => value_with_span
1466 .value
1467 .clone()
1468 .into_string()
1469 .map(|s| s.to_ascii_lowercase()),
1470 SqlExpr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
1471 _ => None,
1472 };
1473
1474 if !matches!(normalized.as_deref(), Some("nulls_first" | "nulls_last")) {
1475 return Err(Error::InvalidArgumentError(format!(
1476 "unsupported value for SET default_null_order: {value_expr:?}"
1477 )));
1478 }
1479
1480 let use_nulls_first = matches!(normalized.as_deref(), Some("nulls_first"));
1481 self.default_nulls_first
1482 .store(use_nulls_first, AtomicOrdering::Relaxed);
1483
1484 Ok(RuntimeStatementResult::NoOp)
1485 }
1486 "immediate_transaction_mode" => {
1487 if values.len() != 1 {
1488 return Err(Error::InvalidArgumentError(
1489 "SET immediate_transaction_mode expects exactly one value".into(),
1490 ));
1491 }
1492 let normalized = values[0].to_string().to_ascii_lowercase();
1493 let enabled = match normalized.as_str() {
1494 "true" | "on" | "1" => true,
1495 "false" | "off" | "0" => false,
1496 _ => {
1497 return Err(Error::InvalidArgumentError(format!(
1498 "unsupported value for SET immediate_transaction_mode: {}",
1499 values[0]
1500 )));
1501 }
1502 };
1503 if !enabled {
1504 tracing::warn!(
1505 "SET immediate_transaction_mode=false has no effect; continuing with auto mode"
1506 );
1507 }
1508 Ok(RuntimeStatementResult::NoOp)
1509 }
1510 _ => Err(Error::InvalidArgumentError(format!(
1511 "unsupported SET variable: {variable_name_raw}"
1512 ))),
1513 }
1514 }
1515 other => Err(Error::InvalidArgumentError(format!(
1516 "unsupported SQL SET statement: {other:?}",
1517 ))),
1518 }
1519 }
1520
1521 fn handle_pragma(
1522 &self,
1523 name: ObjectName,
1524 value: Option<Value>,
1525 is_eq: bool,
1526 ) -> SqlResult<RuntimeStatementResult<P>> {
1527 let (display, canonical) = canonical_object_name(&name)?;
1528 if value.is_some() || is_eq {
1529 return Err(Error::InvalidArgumentError(format!(
1530 "PRAGMA '{display}' does not accept a value"
1531 )));
1532 }
1533
1534 match canonical.as_str() {
1535 "enable_verification" | "disable_verification" => Ok(RuntimeStatementResult::NoOp),
1536 _ => Err(Error::InvalidArgumentError(format!(
1537 "unsupported PRAGMA '{}'",
1538 display
1539 ))),
1540 }
1541 }
1542}
1543
1544fn canonical_object_name(name: &ObjectName) -> SqlResult<(String, String)> {
1545 if name.0.is_empty() {
1546 return Err(Error::InvalidArgumentError(
1547 "object name must not be empty".into(),
1548 ));
1549 }
1550 let mut parts: Vec<String> = Vec::with_capacity(name.0.len());
1551 for part in &name.0 {
1552 let ident = match part {
1553 ObjectNamePart::Identifier(ident) => ident,
1554 _ => {
1555 return Err(Error::InvalidArgumentError(
1556 "object names using functions are not supported".into(),
1557 ));
1558 }
1559 };
1560 parts.push(ident.value.clone());
1561 }
1562 let display = parts.join(".");
1563 let canonical = display.to_ascii_lowercase();
1564 Ok((display, canonical))
1565}
1566
1567fn validate_create_table_common(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
1568 if stmt.clone.is_some() || stmt.like.is_some() {
1569 return Err(Error::InvalidArgumentError(
1570 "CREATE TABLE LIKE/CLONE is not supported".into(),
1571 ));
1572 }
1573 if stmt.or_replace && stmt.if_not_exists {
1574 return Err(Error::InvalidArgumentError(
1575 "CREATE TABLE cannot combine OR REPLACE with IF NOT EXISTS".into(),
1576 ));
1577 }
1578 if !stmt.constraints.is_empty() {
1579 return Err(Error::InvalidArgumentError(
1580 "table-level constraints are not supported".into(),
1581 ));
1582 }
1583 Ok(())
1584}
1585
1586fn validate_create_table_definition(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
1587 for column in &stmt.columns {
1588 for ColumnOptionDef { option, .. } in &column.options {
1589 match option {
1590 ColumnOption::Null | ColumnOption::NotNull | ColumnOption::Unique { .. } => {}
1591 ColumnOption::Default(_) => {
1592 return Err(Error::InvalidArgumentError(format!(
1593 "DEFAULT values are not supported for column '{}'",
1594 column.name
1595 )));
1596 }
1597 other => {
1598 return Err(Error::InvalidArgumentError(format!(
1599 "unsupported column option {:?} on '{}'",
1600 other, column.name
1601 )));
1602 }
1603 }
1604 }
1605 }
1606 Ok(())
1607}
1608
1609fn validate_create_table_as(stmt: &sqlparser::ast::CreateTable) -> SqlResult<()> {
1610 if !stmt.columns.is_empty() {
1611 return Err(Error::InvalidArgumentError(
1612 "CREATE TABLE AS SELECT does not support column definitions yet".into(),
1613 ));
1614 }
1615 Ok(())
1616}
1617
1618fn validate_simple_query(query: &Query) -> SqlResult<()> {
1619 if query.with.is_some() {
1620 return Err(Error::InvalidArgumentError(
1621 "WITH clauses are not supported".into(),
1622 ));
1623 }
1624 if let Some(limit_clause) = &query.limit_clause {
1625 match limit_clause {
1626 LimitClause::LimitOffset {
1627 offset: Some(_), ..
1628 }
1629 | LimitClause::OffsetCommaLimit { .. } => {
1630 return Err(Error::InvalidArgumentError(
1631 "OFFSET clauses are not supported".into(),
1632 ));
1633 }
1634 LimitClause::LimitOffset { limit_by, .. } if !limit_by.is_empty() => {
1635 return Err(Error::InvalidArgumentError(
1636 "LIMIT BY clauses are not supported".into(),
1637 ));
1638 }
1639 _ => {}
1640 }
1641 }
1642 if query.fetch.is_some() {
1643 return Err(Error::InvalidArgumentError(
1644 "FETCH clauses are not supported".into(),
1645 ));
1646 }
1647 Ok(())
1648}
1649
1650fn resolve_column_name(expr: &SqlExpr) -> SqlResult<String> {
1651 match expr {
1652 SqlExpr::Identifier(ident) => Ok(ident.value.clone()),
1653 SqlExpr::CompoundIdentifier(parts) => {
1654 if let Some(last) = parts.last() {
1655 Ok(last.value.clone())
1656 } else {
1657 Err(Error::InvalidArgumentError(
1658 "empty column identifier".into(),
1659 ))
1660 }
1661 }
1662 _ => Err(Error::InvalidArgumentError(
1663 "aggregate arguments must be plain column identifiers".into(),
1664 )),
1665 }
1666}
1667
1668#[allow(dead_code)] fn expr_contains_aggregate(expr: &llkv_expr::expr::ScalarExpr<String>) -> bool {
1672 match expr {
1673 llkv_expr::expr::ScalarExpr::Aggregate(_) => true,
1674 llkv_expr::expr::ScalarExpr::Binary { left, right, .. } => {
1675 expr_contains_aggregate(left) || expr_contains_aggregate(right)
1676 }
1677 llkv_expr::expr::ScalarExpr::Column(_) | llkv_expr::expr::ScalarExpr::Literal(_) => false,
1678 }
1679}
1680
1681fn try_parse_aggregate_function(
1682 func: &sqlparser::ast::Function,
1683) -> SqlResult<Option<llkv_expr::expr::AggregateCall<String>>> {
1684 use sqlparser::ast::{FunctionArg, FunctionArgExpr, FunctionArguments, ObjectNamePart};
1685
1686 if func.uses_odbc_syntax {
1687 return Ok(None);
1688 }
1689 if !matches!(func.parameters, FunctionArguments::None) {
1690 return Ok(None);
1691 }
1692 if func.filter.is_some()
1693 || func.null_treatment.is_some()
1694 || func.over.is_some()
1695 || !func.within_group.is_empty()
1696 {
1697 return Ok(None);
1698 }
1699
1700 let func_name = if func.name.0.len() == 1 {
1701 match &func.name.0[0] {
1702 ObjectNamePart::Identifier(ident) => ident.value.to_ascii_lowercase(),
1703 _ => return Ok(None),
1704 }
1705 } else {
1706 return Ok(None);
1707 };
1708
1709 let args_slice: &[FunctionArg] = match &func.args {
1710 FunctionArguments::List(list) => {
1711 if list.duplicate_treatment.is_some() || !list.clauses.is_empty() {
1712 return Ok(None);
1713 }
1714 &list.args
1715 }
1716 FunctionArguments::None => &[],
1717 FunctionArguments::Subquery(_) => return Ok(None),
1718 };
1719
1720 let agg_call = match func_name.as_str() {
1721 "count" => {
1722 if args_slice.len() != 1 {
1723 return Err(Error::InvalidArgumentError(
1724 "COUNT accepts exactly one argument".into(),
1725 ));
1726 }
1727 match &args_slice[0] {
1728 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
1729 llkv_expr::expr::AggregateCall::CountStar
1730 }
1731 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg_expr)) => {
1732 let column = resolve_column_name(arg_expr)?;
1733 llkv_expr::expr::AggregateCall::Count(column)
1734 }
1735 _ => {
1736 return Err(Error::InvalidArgumentError(
1737 "unsupported COUNT argument".into(),
1738 ));
1739 }
1740 }
1741 }
1742 "sum" => {
1743 if args_slice.len() != 1 {
1744 return Err(Error::InvalidArgumentError(
1745 "SUM accepts exactly one argument".into(),
1746 ));
1747 }
1748 let arg_expr = match &args_slice[0] {
1749 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
1750 _ => {
1751 return Err(Error::InvalidArgumentError(
1752 "SUM requires a column argument".into(),
1753 ));
1754 }
1755 };
1756
1757 if let Some(column) = parse_count_nulls_case(arg_expr)? {
1759 llkv_expr::expr::AggregateCall::CountNulls(column)
1760 } else {
1761 let column = resolve_column_name(arg_expr)?;
1762 llkv_expr::expr::AggregateCall::Sum(column)
1763 }
1764 }
1765 "min" => {
1766 if args_slice.len() != 1 {
1767 return Err(Error::InvalidArgumentError(
1768 "MIN accepts exactly one argument".into(),
1769 ));
1770 }
1771 let arg_expr = match &args_slice[0] {
1772 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
1773 _ => {
1774 return Err(Error::InvalidArgumentError(
1775 "MIN requires a column argument".into(),
1776 ));
1777 }
1778 };
1779 let column = resolve_column_name(arg_expr)?;
1780 llkv_expr::expr::AggregateCall::Min(column)
1781 }
1782 "max" => {
1783 if args_slice.len() != 1 {
1784 return Err(Error::InvalidArgumentError(
1785 "MAX accepts exactly one argument".into(),
1786 ));
1787 }
1788 let arg_expr = match &args_slice[0] {
1789 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => expr,
1790 _ => {
1791 return Err(Error::InvalidArgumentError(
1792 "MAX requires a column argument".into(),
1793 ));
1794 }
1795 };
1796 let column = resolve_column_name(arg_expr)?;
1797 llkv_expr::expr::AggregateCall::Max(column)
1798 }
1799 _ => return Ok(None),
1800 };
1801
1802 Ok(Some(agg_call))
1803}
1804
1805fn parse_count_nulls_case(expr: &SqlExpr) -> SqlResult<Option<String>> {
1806 let SqlExpr::Case {
1807 operand,
1808 conditions,
1809 else_result,
1810 ..
1811 } = expr
1812 else {
1813 return Ok(None);
1814 };
1815
1816 if operand.is_some() || conditions.len() != 1 {
1817 return Ok(None);
1818 }
1819
1820 let case_when = &conditions[0];
1821 if !is_integer_literal(&case_when.result, 1) {
1822 return Ok(None);
1823 }
1824
1825 let else_expr = match else_result {
1826 Some(expr) => expr.as_ref(),
1827 None => return Ok(None),
1828 };
1829 if !is_integer_literal(else_expr, 0) {
1830 return Ok(None);
1831 }
1832
1833 let inner = match &case_when.condition {
1834 SqlExpr::IsNull(inner) => inner.as_ref(),
1835 _ => return Ok(None),
1836 };
1837
1838 resolve_column_name(inner).map(Some)
1839}
1840
1841fn is_integer_literal(expr: &SqlExpr, expected: i64) -> bool {
1842 match expr {
1843 SqlExpr::Value(ValueWithSpan {
1844 value: Value::Number(text, _),
1845 ..
1846 }) => text.parse::<i64>() == Ok(expected),
1847 _ => false,
1848 }
1849}
1850
1851fn translate_condition(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
1852 match expr {
1853 SqlExpr::BinaryOp { left, op, right } => match op {
1854 BinaryOperator::And => Ok(llkv_expr::expr::Expr::And(vec![
1855 translate_condition(left)?,
1856 translate_condition(right)?,
1857 ])),
1858 BinaryOperator::Or => Ok(llkv_expr::expr::Expr::Or(vec![
1859 translate_condition(left)?,
1860 translate_condition(right)?,
1861 ])),
1862 BinaryOperator::Eq
1863 | BinaryOperator::NotEq
1864 | BinaryOperator::Lt
1865 | BinaryOperator::LtEq
1866 | BinaryOperator::Gt
1867 | BinaryOperator::GtEq => translate_comparison(left, op.clone(), right),
1868 other => Err(Error::InvalidArgumentError(format!(
1869 "unsupported binary operator in WHERE clause: {other:?}"
1870 ))),
1871 },
1872 SqlExpr::UnaryOp {
1873 op: UnaryOperator::Not,
1874 expr,
1875 } => Ok(llkv_expr::expr::Expr::not(translate_condition(expr)?)),
1876 SqlExpr::Nested(inner) => translate_condition(inner),
1877 other => Err(Error::InvalidArgumentError(format!(
1878 "unsupported WHERE clause: {other:?}"
1879 ))),
1880 }
1881}
1882
1883fn translate_comparison(
1884 left: &SqlExpr,
1885 op: BinaryOperator,
1886 right: &SqlExpr,
1887) -> SqlResult<llkv_expr::expr::Expr<'static, String>> {
1888 let left_scalar = translate_scalar(left)?;
1889 let right_scalar = translate_scalar(right)?;
1890 let compare_op = match op {
1891 BinaryOperator::Eq => llkv_expr::expr::CompareOp::Eq,
1892 BinaryOperator::NotEq => llkv_expr::expr::CompareOp::NotEq,
1893 BinaryOperator::Lt => llkv_expr::expr::CompareOp::Lt,
1894 BinaryOperator::LtEq => llkv_expr::expr::CompareOp::LtEq,
1895 BinaryOperator::Gt => llkv_expr::expr::CompareOp::Gt,
1896 BinaryOperator::GtEq => llkv_expr::expr::CompareOp::GtEq,
1897 other => {
1898 return Err(Error::InvalidArgumentError(format!(
1899 "unsupported comparison operator: {other:?}"
1900 )));
1901 }
1902 };
1903
1904 if let (
1905 llkv_expr::expr::ScalarExpr::Column(column),
1906 llkv_expr::expr::ScalarExpr::Literal(literal),
1907 ) = (&left_scalar, &right_scalar)
1908 && let Some(op) = compare_op_to_filter_operator(compare_op, literal)
1909 {
1910 return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
1911 field_id: column.clone(),
1912 op,
1913 }));
1914 }
1915
1916 if let (
1917 llkv_expr::expr::ScalarExpr::Literal(literal),
1918 llkv_expr::expr::ScalarExpr::Column(column),
1919 ) = (&left_scalar, &right_scalar)
1920 && let Some(flipped) = flip_compare_op(compare_op)
1921 && let Some(op) = compare_op_to_filter_operator(flipped, literal)
1922 {
1923 return Ok(llkv_expr::expr::Expr::Pred(llkv_expr::expr::Filter {
1924 field_id: column.clone(),
1925 op,
1926 }));
1927 }
1928
1929 Ok(llkv_expr::expr::Expr::Compare {
1930 left: left_scalar,
1931 op: compare_op,
1932 right: right_scalar,
1933 })
1934}
1935
1936fn compare_op_to_filter_operator(
1937 op: llkv_expr::expr::CompareOp,
1938 literal: &Literal,
1939) -> Option<llkv_expr::expr::Operator<'static>> {
1940 let lit = literal.clone();
1941 match op {
1942 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::Operator::Equals(lit)),
1943 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::Operator::LessThan(lit)),
1944 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::Operator::LessThanOrEquals(lit)),
1945 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::Operator::GreaterThan(lit)),
1946 llkv_expr::expr::CompareOp::GtEq => {
1947 Some(llkv_expr::expr::Operator::GreaterThanOrEquals(lit))
1948 }
1949 llkv_expr::expr::CompareOp::NotEq => None,
1950 }
1951}
1952
1953fn flip_compare_op(op: llkv_expr::expr::CompareOp) -> Option<llkv_expr::expr::CompareOp> {
1954 match op {
1955 llkv_expr::expr::CompareOp::Eq => Some(llkv_expr::expr::CompareOp::Eq),
1956 llkv_expr::expr::CompareOp::Lt => Some(llkv_expr::expr::CompareOp::Gt),
1957 llkv_expr::expr::CompareOp::LtEq => Some(llkv_expr::expr::CompareOp::GtEq),
1958 llkv_expr::expr::CompareOp::Gt => Some(llkv_expr::expr::CompareOp::Lt),
1959 llkv_expr::expr::CompareOp::GtEq => Some(llkv_expr::expr::CompareOp::LtEq),
1960 llkv_expr::expr::CompareOp::NotEq => None,
1961 }
1962}
1963
1964fn translate_scalar(expr: &SqlExpr) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
1965 match expr {
1966 SqlExpr::Identifier(ident) => Ok(llkv_expr::expr::ScalarExpr::column(ident.value.clone())),
1967 SqlExpr::CompoundIdentifier(idents) => {
1968 if let Some(last) = idents.last() {
1969 translate_scalar(&SqlExpr::Identifier(last.clone()))
1970 } else {
1971 Err(Error::InvalidArgumentError(
1972 "invalid compound identifier".into(),
1973 ))
1974 }
1975 }
1976 SqlExpr::Value(value) => literal_from_value(value),
1977 SqlExpr::BinaryOp { left, op, right } => {
1978 let left_expr = translate_scalar(left)?;
1979 let right_expr = translate_scalar(right)?;
1980 let op = match op {
1981 BinaryOperator::Plus => llkv_expr::expr::BinaryOp::Add,
1982 BinaryOperator::Minus => llkv_expr::expr::BinaryOp::Subtract,
1983 BinaryOperator::Multiply => llkv_expr::expr::BinaryOp::Multiply,
1984 BinaryOperator::Divide => llkv_expr::expr::BinaryOp::Divide,
1985 BinaryOperator::Modulo => llkv_expr::expr::BinaryOp::Modulo,
1986 other => {
1987 return Err(Error::InvalidArgumentError(format!(
1988 "unsupported scalar binary operator: {other:?}"
1989 )));
1990 }
1991 };
1992 Ok(llkv_expr::expr::ScalarExpr::binary(
1993 left_expr, op, right_expr,
1994 ))
1995 }
1996 SqlExpr::UnaryOp {
1997 op: UnaryOperator::Minus,
1998 expr,
1999 } => match translate_scalar(expr)? {
2000 llkv_expr::expr::ScalarExpr::Literal(lit) => match lit {
2001 Literal::Integer(v) => {
2002 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(-v)))
2003 }
2004 Literal::Float(v) => Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(-v))),
2005 Literal::String(_) => Err(Error::InvalidArgumentError(
2006 "cannot negate string literal".into(),
2007 )),
2008 },
2009 _ => Err(Error::InvalidArgumentError(
2010 "cannot negate non-literal expression".into(),
2011 )),
2012 },
2013 SqlExpr::UnaryOp {
2014 op: UnaryOperator::Plus,
2015 expr,
2016 } => translate_scalar(expr),
2017 SqlExpr::Nested(inner) => translate_scalar(inner),
2018 SqlExpr::Function(func) => {
2019 if let Some(agg_call) = try_parse_aggregate_function(func)? {
2021 Ok(llkv_expr::expr::ScalarExpr::aggregate(agg_call))
2022 } else {
2023 Err(Error::InvalidArgumentError(format!(
2024 "unsupported function in scalar expression: {:?}",
2025 func.name
2026 )))
2027 }
2028 }
2029 other => Err(Error::InvalidArgumentError(format!(
2030 "unsupported scalar expression: {other:?}"
2031 ))),
2032 }
2033}
2034
2035fn literal_from_value(value: &ValueWithSpan) -> SqlResult<llkv_expr::expr::ScalarExpr<String>> {
2036 match &value.value {
2037 Value::Number(text, _) => {
2038 if text.contains(['.', 'e', 'E']) {
2039 let parsed = text.parse::<f64>().map_err(|err| {
2040 Error::InvalidArgumentError(format!("invalid float literal: {err}"))
2041 })?;
2042 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Float(parsed)))
2043 } else {
2044 let parsed = text.parse::<i128>().map_err(|err| {
2045 Error::InvalidArgumentError(format!("invalid integer literal: {err}"))
2046 })?;
2047 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::Integer(
2048 parsed,
2049 )))
2050 }
2051 }
2052 Value::Boolean(_) => Err(Error::InvalidArgumentError(
2053 "BOOLEAN literals are not supported yet".into(),
2054 )),
2055 Value::Null => Err(Error::InvalidArgumentError(
2056 "NULL literal is not supported in comparisons; use IS NULL".into(),
2057 )),
2058 other => {
2059 if let Some(text) = other.clone().into_string() {
2060 Ok(llkv_expr::expr::ScalarExpr::literal(Literal::String(text)))
2061 } else {
2062 Err(Error::InvalidArgumentError(format!(
2063 "unsupported literal: {other:?}"
2064 )))
2065 }
2066 }
2067 }
2068}
2069
2070fn resolve_assignment_column_name(target: &AssignmentTarget) -> SqlResult<String> {
2071 match target {
2072 AssignmentTarget::ColumnName(name) => {
2073 if name.0.len() != 1 {
2074 return Err(Error::InvalidArgumentError(
2075 "qualified column names in UPDATE assignments are not supported yet".into(),
2076 ));
2077 }
2078 match &name.0[0] {
2079 ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
2080 other => Err(Error::InvalidArgumentError(format!(
2081 "unsupported column reference in UPDATE assignment: {other:?}"
2082 ))),
2083 }
2084 }
2085 AssignmentTarget::Tuple(_) => Err(Error::InvalidArgumentError(
2086 "tuple assignments are not supported yet".into(),
2087 )),
2088 }
2089}
2090
2091fn arrow_type_from_sql(data_type: &SqlDataType) -> SqlResult<arrow::datatypes::DataType> {
2092 use arrow::datatypes::DataType;
2093 match data_type {
2094 SqlDataType::Int(_)
2095 | SqlDataType::Integer(_)
2096 | SqlDataType::BigInt(_)
2097 | SqlDataType::SmallInt(_)
2098 | SqlDataType::TinyInt(_) => Ok(DataType::Int64),
2099 SqlDataType::Float(_)
2100 | SqlDataType::Real
2101 | SqlDataType::Double(_)
2102 | SqlDataType::DoublePrecision => Ok(DataType::Float64),
2103 SqlDataType::Text
2104 | SqlDataType::String(_)
2105 | SqlDataType::Varchar(_)
2106 | SqlDataType::Char(_)
2107 | SqlDataType::Uuid => Ok(DataType::Utf8),
2108 SqlDataType::Date => Ok(DataType::Date32),
2109 SqlDataType::Decimal(_) | SqlDataType::Numeric(_) => Ok(DataType::Float64),
2110 SqlDataType::Boolean => Err(Error::InvalidArgumentError(
2111 "BOOLEAN columns are not supported yet".into(),
2112 )),
2113 other => Err(Error::InvalidArgumentError(format!(
2114 "unsupported SQL data type: {other:?}"
2115 ))),
2116 }
2117}
2118
2119fn extract_constant_select_rows(select: &Select) -> SqlResult<Option<Vec<Vec<PlanValue>>>> {
2120 if !select.from.is_empty() {
2121 return Ok(None);
2122 }
2123
2124 if select.selection.is_some()
2125 || select.having.is_some()
2126 || !select.named_window.is_empty()
2127 || select.qualify.is_some()
2128 || select.distinct.is_some()
2129 || select.top.is_some()
2130 || select.into.is_some()
2131 || select.prewhere.is_some()
2132 || !select.lateral_views.is_empty()
2133 || select.value_table_mode.is_some()
2134 || !group_by_is_empty(&select.group_by)
2135 {
2136 return Err(Error::InvalidArgumentError(
2137 "constant SELECT statements do not support advanced clauses".into(),
2138 ));
2139 }
2140
2141 if select.projection.is_empty() {
2142 return Err(Error::InvalidArgumentError(
2143 "constant SELECT requires at least one projection".into(),
2144 ));
2145 }
2146
2147 let mut row: Vec<PlanValue> = Vec::with_capacity(select.projection.len());
2148 for item in &select.projection {
2149 let expr = match item {
2150 SelectItem::UnnamedExpr(expr) => expr,
2151 SelectItem::ExprWithAlias { expr, .. } => expr,
2152 other => {
2153 return Err(Error::InvalidArgumentError(format!(
2154 "unsupported projection in constant SELECT: {other:?}"
2155 )));
2156 }
2157 };
2158
2159 let value = SqlValue::try_from_expr(expr)?;
2160 row.push(PlanValue::from(value));
2161 }
2162
2163 Ok(Some(vec![row]))
2164}
2165
2166fn extract_single_table(from: &[TableWithJoins]) -> SqlResult<(String, String)> {
2167 if from.len() != 1 {
2168 return Err(Error::InvalidArgumentError(
2169 "queries over multiple tables are not supported yet".into(),
2170 ));
2171 }
2172 let item = &from[0];
2173 if !item.joins.is_empty() {
2174 return Err(Error::InvalidArgumentError(
2175 "JOIN clauses are not supported yet".into(),
2176 ));
2177 }
2178 match &item.relation {
2179 TableFactor::Table { name, .. } => canonical_object_name(name),
2180 _ => Err(Error::InvalidArgumentError(
2181 "queries require a plain table name".into(),
2182 )),
2183 }
2184}
2185
2186fn group_by_is_empty(expr: &GroupByExpr) -> bool {
2187 matches!(
2188 expr,
2189 GroupByExpr::Expressions(exprs, modifiers)
2190 if exprs.is_empty() && modifiers.is_empty()
2191 )
2192}
2193
2194#[cfg(test)]
2195mod tests {
2196 use super::*;
2197 use arrow::array::{Array, Int64Array, StringArray};
2198 use arrow::record_batch::RecordBatch;
2199 use llkv_storage::pager::MemPager;
2200
2201 fn extract_string_options(batches: &[RecordBatch]) -> Vec<Option<String>> {
2202 let mut values: Vec<Option<String>> = Vec::new();
2203 for batch in batches {
2204 let column = batch
2205 .column(0)
2206 .as_any()
2207 .downcast_ref::<StringArray>()
2208 .expect("string column");
2209 for idx in 0..column.len() {
2210 if column.is_null(idx) {
2211 values.push(None);
2212 } else {
2213 values.push(Some(column.value(idx).to_string()));
2214 }
2215 }
2216 }
2217 values
2218 }
2219
2220 #[test]
2221 fn create_insert_select_roundtrip() {
2222 let pager = Arc::new(MemPager::default());
2223 let engine = SqlEngine::new(pager);
2224
2225 let result = engine
2226 .execute("CREATE TABLE people (id INT NOT NULL, name TEXT NOT NULL)")
2227 .expect("create table");
2228 assert!(matches!(
2229 result[0],
2230 RuntimeStatementResult::CreateTable { .. }
2231 ));
2232
2233 let result = engine
2234 .execute("INSERT INTO people (id, name) VALUES (1, 'alice'), (2, 'bob')")
2235 .expect("insert rows");
2236 assert!(matches!(
2237 result[0],
2238 RuntimeStatementResult::Insert {
2239 rows_inserted: 2,
2240 ..
2241 }
2242 ));
2243
2244 let mut result = engine
2245 .execute("SELECT name FROM people WHERE id = 2")
2246 .expect("select rows");
2247 let select_result = result.remove(0);
2248 let batches = match select_result {
2249 RuntimeStatementResult::Select { execution, .. } => {
2250 execution.collect().expect("collect batches")
2251 }
2252 _ => panic!("expected select result"),
2253 };
2254 assert_eq!(batches.len(), 1);
2255 let column = batches[0]
2256 .column(0)
2257 .as_any()
2258 .downcast_ref::<StringArray>()
2259 .expect("string column");
2260 assert_eq!(column.len(), 1);
2261 assert_eq!(column.value(0), "bob");
2262 }
2263
2264 #[test]
2265 fn insert_select_constant_including_null() {
2266 let pager = Arc::new(MemPager::default());
2267 let engine = SqlEngine::new(pager);
2268
2269 engine
2270 .execute("CREATE TABLE integers(i INTEGER)")
2271 .expect("create table");
2272
2273 let result = engine
2274 .execute("INSERT INTO integers SELECT 42")
2275 .expect("insert literal");
2276 assert!(matches!(
2277 result[0],
2278 RuntimeStatementResult::Insert {
2279 rows_inserted: 1,
2280 ..
2281 }
2282 ));
2283
2284 let result = engine
2285 .execute("INSERT INTO integers SELECT CAST(NULL AS VARCHAR)")
2286 .expect("insert null literal");
2287 assert!(matches!(
2288 result[0],
2289 RuntimeStatementResult::Insert {
2290 rows_inserted: 1,
2291 ..
2292 }
2293 ));
2294
2295 let mut result = engine
2296 .execute("SELECT * FROM integers")
2297 .expect("select rows");
2298 let select_result = result.remove(0);
2299 let batches = match select_result {
2300 RuntimeStatementResult::Select { execution, .. } => {
2301 execution.collect().expect("collect batches")
2302 }
2303 _ => panic!("expected select result"),
2304 };
2305
2306 let mut values: Vec<Option<i64>> = Vec::new();
2307 for batch in &batches {
2308 let column = batch
2309 .column(0)
2310 .as_any()
2311 .downcast_ref::<Int64Array>()
2312 .expect("int column");
2313 for idx in 0..column.len() {
2314 if column.is_null(idx) {
2315 values.push(None);
2316 } else {
2317 values.push(Some(column.value(idx)));
2318 }
2319 }
2320 }
2321
2322 assert_eq!(values, vec![Some(42), None]);
2323 }
2324
2325 #[test]
2326 fn update_with_where_clause_filters_rows() {
2327 let pager = Arc::new(MemPager::default());
2328 let engine = SqlEngine::new(pager);
2329
2330 engine
2331 .execute("SET default_null_order='nulls_first'")
2332 .expect("set default null order");
2333
2334 engine
2335 .execute("CREATE TABLE strings(a VARCHAR)")
2336 .expect("create table");
2337
2338 engine
2339 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
2340 .expect("insert seed rows");
2341
2342 let result = engine
2343 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
2344 .expect("update rows");
2345 assert!(matches!(
2346 result[0],
2347 RuntimeStatementResult::Update {
2348 rows_updated: 1,
2349 ..
2350 }
2351 ));
2352
2353 let mut result = engine
2354 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
2355 .expect("select rows");
2356 let select_result = result.remove(0);
2357 let batches = match select_result {
2358 RuntimeStatementResult::Select { execution, .. } => {
2359 execution.collect().expect("collect batches")
2360 }
2361 _ => panic!("expected select result"),
2362 };
2363
2364 let mut values: Vec<Option<String>> = Vec::new();
2365 for batch in &batches {
2366 let column = batch
2367 .column(0)
2368 .as_any()
2369 .downcast_ref::<StringArray>()
2370 .expect("string column");
2371 for idx in 0..column.len() {
2372 if column.is_null(idx) {
2373 values.push(None);
2374 } else {
2375 values.push(Some(column.value(idx).to_string()));
2376 }
2377 }
2378 }
2379
2380 values.sort_by(|a, b| match (a, b) {
2381 (None, None) => std::cmp::Ordering::Equal,
2382 (None, Some(_)) => std::cmp::Ordering::Less,
2383 (Some(_), None) => std::cmp::Ordering::Greater,
2384 (Some(av), Some(bv)) => {
2385 let a_val = av.parse::<i64>().unwrap_or_default();
2386 let b_val = bv.parse::<i64>().unwrap_or_default();
2387 a_val.cmp(&b_val)
2388 }
2389 });
2390
2391 assert_eq!(
2392 values,
2393 vec![None, Some("4".to_string()), Some("13".to_string())]
2394 );
2395 }
2396
2397 #[test]
2398 fn order_by_honors_configured_default_null_order() {
2399 let pager = Arc::new(MemPager::default());
2400 let engine = SqlEngine::new(pager);
2401
2402 engine
2403 .execute("CREATE TABLE strings(a VARCHAR)")
2404 .expect("create table");
2405 engine
2406 .execute("INSERT INTO strings VALUES ('3'), ('4'), (NULL)")
2407 .expect("insert values");
2408 engine
2409 .execute("UPDATE strings SET a = 13 WHERE a = '3'")
2410 .expect("update value");
2411
2412 let mut result = engine
2413 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
2414 .expect("select rows");
2415 let select_result = result.remove(0);
2416 let batches = match select_result {
2417 RuntimeStatementResult::Select { execution, .. } => {
2418 execution.collect().expect("collect batches")
2419 }
2420 _ => panic!("expected select result"),
2421 };
2422
2423 let values = extract_string_options(&batches);
2424 assert_eq!(
2425 values,
2426 vec![Some("4".to_string()), Some("13".to_string()), None]
2427 );
2428
2429 assert!(!engine.default_nulls_first_for_tests());
2430
2431 engine
2432 .execute("SET default_null_order='nulls_first'")
2433 .expect("set default null order");
2434
2435 assert!(engine.default_nulls_first_for_tests());
2436
2437 let mut result = engine
2438 .execute("SELECT * FROM strings ORDER BY cast(a AS INTEGER)")
2439 .expect("select rows");
2440 let select_result = result.remove(0);
2441 let batches = match select_result {
2442 RuntimeStatementResult::Select { execution, .. } => {
2443 execution.collect().expect("collect batches")
2444 }
2445 _ => panic!("expected select result"),
2446 };
2447
2448 let values = extract_string_options(&batches);
2449 assert_eq!(
2450 values,
2451 vec![None, Some("4".to_string()), Some("13".to_string())]
2452 );
2453 }
2454}