1use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24 CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25 LexOrdering, Statement as DFStatement,
26};
27use crate::planner::{
28 object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{DataType, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36 exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
37 unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
38 DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
39 ToDFSchema,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::builder::project;
44use datafusion_expr::logical_plan::DdlStatement;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47 cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
48 CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49 CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50 DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51 EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52 LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable,
53 SortExpr, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
54 TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
55 Volatility, WriteOp,
56};
57use sqlparser::ast::{
58 self, BeginTransactionKind, IndexColumn, IndexType, NullsDistinctOption, OrderByExpr,
59 OrderByOptions, Set, ShowStatementIn, ShowStatementOptions, SqliteOnConflict,
60 TableObject, UpdateTableFromKind, ValueWithSpan,
61};
62use sqlparser::ast::{
63 Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
64 CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
65 ObjectName, ObjectType, Query, SchemaName, SetExpr, ShowCreateObject,
66 ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
67 TransactionMode, UnaryOperator, Value,
68};
69use sqlparser::parser::ParserError::ParserError;
70
71fn ident_to_string(ident: &Ident) -> String {
72 normalize_ident(ident.to_owned())
73}
74
75fn object_name_to_string(object_name: &ObjectName) -> String {
76 object_name
77 .0
78 .iter()
79 .map(|object_name_part| {
80 object_name_part
81 .as_ident()
82 .map_or_else(String::new, ident_to_string)
85 })
86 .collect::<Vec<String>>()
87 .join(".")
88}
89
90fn get_schema_name(schema_name: &SchemaName) -> String {
91 match schema_name {
92 SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
93 SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
94 SchemaName::NamedAuthorization(schema_name, auth) => format!(
95 "{}.{}",
96 object_name_to_string(schema_name),
97 ident_to_string(auth)
98 ),
99 }
100}
101
102fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
105 let mut constraints = vec![];
106 for column in columns {
107 for ast::ColumnOptionDef { name, option } in &column.options {
108 match option {
109 ast::ColumnOption::Unique {
110 is_primary: false,
111 characteristics,
112 } => constraints.push(TableConstraint::Unique {
113 name: name.clone(),
114 columns: vec![IndexColumn {
115 column: OrderByExpr {
116 expr: SQLExpr::Identifier(column.name.clone()),
117 options: OrderByOptions {
118 asc: None,
119 nulls_first: None,
120 },
121 with_fill: None,
122 },
123 operator_class: None,
124 }],
125 characteristics: *characteristics,
126 index_name: None,
127 index_type_display: ast::KeyOrIndexDisplay::None,
128 index_type: None,
129 index_options: vec![],
130 nulls_distinct: NullsDistinctOption::None,
131 }),
132 ast::ColumnOption::Unique {
133 is_primary: true,
134 characteristics,
135 } => constraints.push(TableConstraint::PrimaryKey {
136 name: name.clone(),
137 columns: vec![IndexColumn {
138 column: OrderByExpr {
139 expr: SQLExpr::Identifier(column.name.clone()),
140 options: OrderByOptions {
141 asc: None,
142 nulls_first: None,
143 },
144 with_fill: None,
145 },
146 operator_class: None,
147 }],
148 characteristics: *characteristics,
149 index_name: None,
150 index_type: None,
151 index_options: vec![],
152 }),
153 ast::ColumnOption::ForeignKey {
154 foreign_table,
155 referred_columns,
156 on_delete,
157 on_update,
158 characteristics,
159 } => constraints.push(TableConstraint::ForeignKey {
160 name: name.clone(),
161 columns: vec![],
162 foreign_table: foreign_table.clone(),
163 referred_columns: referred_columns.to_vec(),
164 on_delete: *on_delete,
165 on_update: *on_update,
166 characteristics: *characteristics,
167 index_name: None,
168 }),
169 ast::ColumnOption::Check(expr) => {
170 constraints.push(TableConstraint::Check {
171 name: name.clone(),
172 expr: Box::new(expr.clone()),
173 enforced: None,
174 })
175 }
176 ast::ColumnOption::Default(_)
178 | ast::ColumnOption::Null
179 | ast::ColumnOption::NotNull
180 | ast::ColumnOption::DialectSpecific(_)
181 | ast::ColumnOption::CharacterSet(_)
182 | ast::ColumnOption::Generated { .. }
183 | ast::ColumnOption::Comment(_)
184 | ast::ColumnOption::Options(_)
185 | ast::ColumnOption::OnUpdate(_)
186 | ast::ColumnOption::Materialized(_)
187 | ast::ColumnOption::Ephemeral(_)
188 | ast::ColumnOption::Identity(_)
189 | ast::ColumnOption::OnConflict(_)
190 | ast::ColumnOption::Policy(_)
191 | ast::ColumnOption::Tags(_)
192 | ast::ColumnOption::Alias(_)
193 | ast::ColumnOption::Srid(_)
194 | ast::ColumnOption::Collation(_) => {}
195 }
196 }
197 }
198 constraints
199}
200
201impl<S: ContextProvider> SqlToRel<'_, S> {
202 pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
204 match statement {
205 DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
206 DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
207 DFStatement::CopyTo(s) => self.copy_to_plan(s),
208 DFStatement::Explain(ExplainStatement {
209 verbose,
210 analyze,
211 format,
212 statement,
213 }) => self.explain_to_plan(verbose, analyze, format, *statement),
214 }
215 }
216
217 pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
219 self.sql_statement_to_plan_with_context_impl(
220 statement,
221 &mut PlannerContext::new(),
222 )
223 }
224
225 pub fn sql_statement_to_plan_with_context(
227 &self,
228 statement: Statement,
229 planner_context: &mut PlannerContext,
230 ) -> Result<LogicalPlan> {
231 self.sql_statement_to_plan_with_context_impl(statement, planner_context)
232 }
233
234 fn sql_statement_to_plan_with_context_impl(
235 &self,
236 statement: Statement,
237 planner_context: &mut PlannerContext,
238 ) -> Result<LogicalPlan> {
239 match statement {
240 Statement::ExplainTable {
241 describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, table_name,
243 ..
244 } => self.describe_table_to_plan(table_name),
245 Statement::Explain {
246 verbose,
247 statement,
248 analyze,
249 format,
250 describe_alias: _,
251 ..
252 } => {
253 let format = format.map(|format| format.to_string());
254 let statement = DFStatement::Statement(statement);
255 self.explain_to_plan(verbose, analyze, format, statement)
256 }
257 Statement::Query(query) => self.query_to_plan(*query, planner_context),
258 Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
259 Statement::Set(statement) => self.set_statement_to_plan(statement),
260 Statement::CreateTable(CreateTable {
261 temporary,
262 external,
263 global,
264 transient,
265 volatile,
266 hive_distribution,
267 hive_formats,
268 file_format,
269 location,
270 query,
271 name,
272 columns,
273 constraints,
274 if_not_exists,
275 or_replace,
276 without_rowid,
277 like,
278 clone,
279 comment,
280 on_commit,
281 on_cluster,
282 primary_key,
283 order_by,
284 partition_by,
285 cluster_by,
286 clustered_by,
287 strict,
288 copy_grants,
289 enable_schema_evolution,
290 change_tracking,
291 data_retention_time_in_days,
292 max_data_extension_time_in_days,
293 default_ddl_collation,
294 with_aggregation_policy,
295 with_row_access_policy,
296 with_tags,
297 iceberg,
298 external_volume,
299 base_location,
300 catalog,
301 catalog_sync,
302 storage_serialization_policy,
303 inherits,
304 table_options: CreateTableOptions::None,
305 }) => {
306 if temporary {
307 return not_impl_err!("Temporary tables not supported")?;
308 }
309 if external {
310 return not_impl_err!("External tables not supported")?;
311 }
312 if global.is_some() {
313 return not_impl_err!("Global tables not supported")?;
314 }
315 if transient {
316 return not_impl_err!("Transient tables not supported")?;
317 }
318 if volatile {
319 return not_impl_err!("Volatile tables not supported")?;
320 }
321 if hive_distribution != ast::HiveDistributionStyle::NONE {
322 return not_impl_err!(
323 "Hive distribution not supported: {hive_distribution:?}"
324 )?;
325 }
326 if !matches!(
327 hive_formats,
328 Some(ast::HiveFormat {
329 row_format: None,
330 serde_properties: None,
331 storage: None,
332 location: None,
333 })
334 ) {
335 return not_impl_err!(
336 "Hive formats not supported: {hive_formats:?}"
337 )?;
338 }
339 if file_format.is_some() {
340 return not_impl_err!("File format not supported")?;
341 }
342 if location.is_some() {
343 return not_impl_err!("Location not supported")?;
344 }
345 if without_rowid {
346 return not_impl_err!("Without rowid not supported")?;
347 }
348 if like.is_some() {
349 return not_impl_err!("Like not supported")?;
350 }
351 if clone.is_some() {
352 return not_impl_err!("Clone not supported")?;
353 }
354 if comment.is_some() {
355 return not_impl_err!("Comment not supported")?;
356 }
357 if on_commit.is_some() {
358 return not_impl_err!("On commit not supported")?;
359 }
360 if on_cluster.is_some() {
361 return not_impl_err!("On cluster not supported")?;
362 }
363 if primary_key.is_some() {
364 return not_impl_err!("Primary key not supported")?;
365 }
366 if order_by.is_some() {
367 return not_impl_err!("Order by not supported")?;
368 }
369 if partition_by.is_some() {
370 return not_impl_err!("Partition by not supported")?;
371 }
372 if cluster_by.is_some() {
373 return not_impl_err!("Cluster by not supported")?;
374 }
375 if clustered_by.is_some() {
376 return not_impl_err!("Clustered by not supported")?;
377 }
378 if strict {
379 return not_impl_err!("Strict not supported")?;
380 }
381 if copy_grants {
382 return not_impl_err!("Copy grants not supported")?;
383 }
384 if enable_schema_evolution.is_some() {
385 return not_impl_err!("Enable schema evolution not supported")?;
386 }
387 if change_tracking.is_some() {
388 return not_impl_err!("Change tracking not supported")?;
389 }
390 if data_retention_time_in_days.is_some() {
391 return not_impl_err!("Data retention time in days not supported")?;
392 }
393 if max_data_extension_time_in_days.is_some() {
394 return not_impl_err!(
395 "Max data extension time in days not supported"
396 )?;
397 }
398 if default_ddl_collation.is_some() {
399 return not_impl_err!("Default DDL collation not supported")?;
400 }
401 if with_aggregation_policy.is_some() {
402 return not_impl_err!("With aggregation policy not supported")?;
403 }
404 if with_row_access_policy.is_some() {
405 return not_impl_err!("With row access policy not supported")?;
406 }
407 if with_tags.is_some() {
408 return not_impl_err!("With tags not supported")?;
409 }
410 if iceberg {
411 return not_impl_err!("Iceberg not supported")?;
412 }
413 if external_volume.is_some() {
414 return not_impl_err!("External volume not supported")?;
415 }
416 if base_location.is_some() {
417 return not_impl_err!("Base location not supported")?;
418 }
419 if catalog.is_some() {
420 return not_impl_err!("Catalog not supported")?;
421 }
422 if catalog_sync.is_some() {
423 return not_impl_err!("Catalog sync not supported")?;
424 }
425 if storage_serialization_policy.is_some() {
426 return not_impl_err!("Storage serialization policy not supported")?;
427 }
428 if inherits.is_some() {
429 return not_impl_err!("Table inheritance not supported")?;
430 }
431
432 let mut all_constraints = constraints;
434 let inline_constraints = calc_inline_constraints_from_columns(&columns);
435 all_constraints.extend(inline_constraints);
436 let column_defaults =
438 self.build_column_defaults(&columns, planner_context)?;
439
440 let has_columns = !columns.is_empty();
441 let schema = self.build_schema(columns)?.to_dfschema_ref()?;
442 if has_columns {
443 planner_context.set_table_schema(Some(Arc::clone(&schema)));
444 }
445
446 match query {
447 Some(query) => {
448 let plan = self.query_to_plan(*query, planner_context)?;
449 let input_schema = plan.schema();
450
451 let plan = if has_columns {
452 if schema.fields().len() != input_schema.fields().len() {
453 return plan_err!(
454 "Mismatch: {} columns specified, but result has {} columns",
455 schema.fields().len(),
456 input_schema.fields().len()
457 );
458 }
459 let input_fields = input_schema.fields();
460 let project_exprs = schema
461 .fields()
462 .iter()
463 .zip(input_fields)
464 .map(|(field, input_field)| {
465 cast(
466 col(input_field.name()),
467 field.data_type().clone(),
468 )
469 .alias(field.name())
470 })
471 .collect::<Vec<_>>();
472
473 LogicalPlanBuilder::from(plan.clone())
474 .project(project_exprs)?
475 .build()?
476 } else {
477 plan
478 };
479
480 let constraints = self.new_constraint_from_table_constraints(
481 &all_constraints,
482 plan.schema(),
483 )?;
484
485 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
486 CreateMemoryTable {
487 name: self.object_name_to_table_reference(name)?,
488 constraints,
489 input: Arc::new(plan),
490 if_not_exists,
491 or_replace,
492 column_defaults,
493 temporary,
494 },
495 )))
496 }
497
498 None => {
499 let plan = EmptyRelation {
500 produce_one_row: false,
501 schema,
502 };
503 let plan = LogicalPlan::EmptyRelation(plan);
504 let constraints = self.new_constraint_from_table_constraints(
505 &all_constraints,
506 plan.schema(),
507 )?;
508 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
509 CreateMemoryTable {
510 name: self.object_name_to_table_reference(name)?,
511 constraints,
512 input: Arc::new(plan),
513 if_not_exists,
514 or_replace,
515 column_defaults,
516 temporary,
517 },
518 )))
519 }
520 }
521 }
522
523 Statement::CreateView {
524 or_replace,
525 materialized,
526 name,
527 columns,
528 query,
529 options: CreateTableOptions::None,
530 cluster_by,
531 comment,
532 with_no_schema_binding,
533 if_not_exists,
534 temporary,
535 to,
536 params,
537 or_alter,
538 } => {
539 if materialized {
540 return not_impl_err!("Materialized views not supported")?;
541 }
542 if !cluster_by.is_empty() {
543 return not_impl_err!("Cluster by not supported")?;
544 }
545 if comment.is_some() {
546 return not_impl_err!("Comment not supported")?;
547 }
548 if with_no_schema_binding {
549 return not_impl_err!("With no schema binding not supported")?;
550 }
551 if if_not_exists {
552 return not_impl_err!("If not exists not supported")?;
553 }
554 if to.is_some() {
555 return not_impl_err!("To not supported")?;
556 }
557
558 let stmt = Statement::CreateView {
561 or_replace,
562 materialized,
563 name,
564 columns,
565 query,
566 options: CreateTableOptions::None,
567 cluster_by,
568 comment,
569 with_no_schema_binding,
570 if_not_exists,
571 temporary,
572 to,
573 params,
574 or_alter,
575 };
576 let sql = stmt.to_string();
577 let Statement::CreateView {
578 name,
579 columns,
580 query,
581 or_replace,
582 temporary,
583 ..
584 } = stmt
585 else {
586 return internal_err!("Unreachable code in create view");
587 };
588
589 let columns = columns
590 .into_iter()
591 .map(|view_column_def| {
592 if let Some(options) = view_column_def.options {
593 plan_err!(
594 "Options not supported for view columns: {options:?}"
595 )
596 } else {
597 Ok(view_column_def.name)
598 }
599 })
600 .collect::<Result<Vec<_>>>()?;
601
602 let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
603 plan = self.apply_expr_alias(plan, columns)?;
604
605 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
606 name: self.object_name_to_table_reference(name)?,
607 input: Arc::new(plan),
608 or_replace,
609 definition: Some(sql),
610 temporary,
611 })))
612 }
613 Statement::ShowCreate { obj_type, obj_name } => match obj_type {
614 ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
615 _ => {
616 not_impl_err!("Only `SHOW CREATE TABLE ...` statement is supported")
617 }
618 },
619 Statement::CreateSchema {
620 schema_name,
621 if_not_exists,
622 ..
623 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
624 CreateCatalogSchema {
625 schema_name: get_schema_name(&schema_name),
626 if_not_exists,
627 schema: Arc::new(DFSchema::empty()),
628 },
629 ))),
630 Statement::CreateDatabase {
631 db_name,
632 if_not_exists,
633 ..
634 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
635 CreateCatalog {
636 catalog_name: object_name_to_string(&db_name),
637 if_not_exists,
638 schema: Arc::new(DFSchema::empty()),
639 },
640 ))),
641 Statement::Drop {
642 object_type,
643 if_exists,
644 mut names,
645 cascade,
646 restrict: _,
647 purge: _,
648 temporary: _,
649 table: _,
650 } => {
651 let name = match names.len() {
654 0 => Err(ParserError("Missing table name.".to_string()).into()),
655 1 => self.object_name_to_table_reference(names.pop().unwrap()),
656 _ => {
657 Err(ParserError("Multiple objects not supported".to_string())
658 .into())
659 }
660 }?;
661
662 match object_type {
663 ObjectType::Table => {
664 Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
665 name,
666 if_exists,
667 schema: DFSchemaRef::new(DFSchema::empty()),
668 })))
669 }
670 ObjectType::View => {
671 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
672 name,
673 if_exists,
674 schema: DFSchemaRef::new(DFSchema::empty()),
675 })))
676 }
677 ObjectType::Schema => {
678 let name = match name {
679 TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
680 TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
681 TableReference::Full { catalog: _, schema: _, table: _ } => {
682 Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
683 }
684 }?;
685 Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
686 name,
687 if_exists,
688 cascade,
689 schema: DFSchemaRef::new(DFSchema::empty()),
690 })))
691 }
692 _ => not_impl_err!(
693 "Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
694 ),
695 }
696 }
697 Statement::Prepare {
698 name,
699 data_types,
700 statement,
701 } => {
702 let mut data_types: Vec<DataType> = data_types
704 .into_iter()
705 .map(|t| self.convert_data_type(&t))
706 .collect::<Result<_>>()?;
707
708 let mut planner_context = PlannerContext::new()
710 .with_prepare_param_data_types(data_types.clone());
711
712 let plan = self.sql_statement_to_plan_with_context_impl(
714 *statement,
715 &mut planner_context,
716 )?;
717
718 if data_types.is_empty() {
719 let map_types = plan.get_parameter_types()?;
720 let param_types: Vec<_> = (1..=map_types.len())
721 .filter_map(|i| {
722 let key = format!("${i}");
723 map_types.get(&key).and_then(|opt| opt.clone())
724 })
725 .collect();
726 data_types.extend(param_types.iter().cloned());
727 planner_context.with_prepare_param_data_types(param_types);
728 }
729
730 Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
731 name: ident_to_string(&name),
732 data_types,
733 input: Arc::new(plan),
734 })))
735 }
736 Statement::Execute {
737 name,
738 parameters,
739 using,
740 has_parentheses: _,
743 immediate,
744 into,
745 output,
746 default,
747 } => {
748 if !using.is_empty() {
750 return not_impl_err!(
751 "Execute statement with USING is not supported"
752 );
753 }
754 if immediate {
755 return not_impl_err!(
756 "Execute statement with IMMEDIATE is not supported"
757 );
758 }
759 if !into.is_empty() {
760 return not_impl_err!("Execute statement with INTO is not supported");
761 }
762 if output {
763 return not_impl_err!(
764 "Execute statement with OUTPUT is not supported"
765 );
766 }
767 if default {
768 return not_impl_err!(
769 "Execute statement with DEFAULT is not supported"
770 );
771 }
772 let empty_schema = DFSchema::empty();
773 let parameters = parameters
774 .into_iter()
775 .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
776 .collect::<Result<Vec<Expr>>>()?;
777
778 Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
779 name: object_name_to_string(&name.unwrap()),
780 parameters,
781 })))
782 }
783 Statement::Deallocate {
784 name,
785 prepare: _,
787 } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
788 Deallocate {
789 name: ident_to_string(&name),
790 },
791 ))),
792
793 Statement::ShowTables {
794 extended,
795 full,
796 terse,
797 history,
798 external,
799 show_options,
800 } => {
801 if extended {
804 return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
805 }
806 if full {
807 return not_impl_err!("SHOW FULL TABLES not supported")?;
808 }
809 if terse {
810 return not_impl_err!("SHOW TERSE TABLES not supported")?;
811 }
812 if history {
813 return not_impl_err!("SHOW TABLES HISTORY not supported")?;
814 }
815 if external {
816 return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
817 }
818 let ShowStatementOptions {
819 show_in,
820 starts_with,
821 limit,
822 limit_from,
823 filter_position,
824 } = show_options;
825 if show_in.is_some() {
826 return not_impl_err!("SHOW TABLES IN not supported")?;
827 }
828 if starts_with.is_some() {
829 return not_impl_err!("SHOW TABLES LIKE not supported")?;
830 }
831 if limit.is_some() {
832 return not_impl_err!("SHOW TABLES LIMIT not supported")?;
833 }
834 if limit_from.is_some() {
835 return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
836 }
837 if filter_position.is_some() {
838 return not_impl_err!("SHOW TABLES FILTER not supported")?;
839 }
840 self.show_tables_to_plan()
841 }
842
843 Statement::ShowColumns {
844 extended,
845 full,
846 show_options,
847 } => {
848 let ShowStatementOptions {
849 show_in,
850 starts_with,
851 limit,
852 limit_from,
853 filter_position,
854 } = show_options;
855 if starts_with.is_some() {
856 return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
857 }
858 if limit.is_some() {
859 return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
860 }
861 if limit_from.is_some() {
862 return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
863 }
864 if filter_position.is_some() {
865 return not_impl_err!(
866 "SHOW COLUMNS with WHERE or LIKE is not supported"
867 )?;
868 }
869 let Some(ShowStatementIn {
870 clause: _,
873 parent_type,
874 parent_name,
875 }) = show_in
876 else {
877 return plan_err!("SHOW COLUMNS requires a table name");
878 };
879
880 if let Some(parent_type) = parent_type {
881 return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
882 }
883 let Some(table_name) = parent_name else {
884 return plan_err!("SHOW COLUMNS requires a table name");
885 };
886
887 self.show_columns_to_plan(extended, full, table_name)
888 }
889
890 Statement::ShowFunctions { filter, .. } => {
891 self.show_functions_to_plan(filter)
892 }
893
894 Statement::Insert(Insert {
895 or,
896 into,
897 columns,
898 overwrite,
899 source,
900 partitioned,
901 after_columns,
902 table,
903 on,
904 returning,
905 ignore,
906 table_alias,
907 mut replace_into,
908 priority,
909 insert_alias,
910 assignments,
911 has_table_keyword,
912 settings,
913 format_clause,
914 }) => {
915 let table_name = match table {
916 TableObject::TableName(table_name) => table_name,
917 TableObject::TableFunction(_) => {
918 return not_impl_err!("INSERT INTO Table functions not supported")
919 }
920 };
921 if let Some(or) = or {
922 match or {
923 SqliteOnConflict::Replace => replace_into = true,
924 _ => plan_err!("Inserts with {or} clause is not supported")?,
925 }
926 }
927 if partitioned.is_some() {
928 plan_err!("Partitioned inserts not yet supported")?;
929 }
930 if !after_columns.is_empty() {
931 plan_err!("After-columns clause not supported")?;
932 }
933 if on.is_some() {
934 plan_err!("Insert-on clause not supported")?;
935 }
936 if returning.is_some() {
937 plan_err!("Insert-returning clause not supported")?;
938 }
939 if ignore {
940 plan_err!("Insert-ignore clause not supported")?;
941 }
942 let Some(source) = source else {
943 plan_err!("Inserts without a source not supported")?
944 };
945 if let Some(table_alias) = table_alias {
946 plan_err!(
947 "Inserts with a table alias not supported: {table_alias:?}"
948 )?
949 };
950 if let Some(priority) = priority {
951 plan_err!(
952 "Inserts with a `PRIORITY` clause not supported: {priority:?}"
953 )?
954 };
955 if insert_alias.is_some() {
956 plan_err!("Inserts with an alias not supported")?;
957 }
958 if !assignments.is_empty() {
959 plan_err!("Inserts with assignments not supported")?;
960 }
961 if settings.is_some() {
962 plan_err!("Inserts with settings not supported")?;
963 }
964 if format_clause.is_some() {
965 plan_err!("Inserts with format clause not supported")?;
966 }
967 let _ = into;
969 let _ = has_table_keyword;
970 self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
971 }
972 Statement::Update {
973 table,
974 assignments,
975 from,
976 selection,
977 returning,
978 or,
979 } => {
980 let from_clauses =
981 from.map(|update_table_from_kind| match update_table_from_kind {
982 UpdateTableFromKind::BeforeSet(from_clauses) => from_clauses,
983 UpdateTableFromKind::AfterSet(from_clauses) => from_clauses,
984 });
985 if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
987 plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
988 }
989 let update_from = from_clauses.and_then(|mut f| f.pop());
990 if returning.is_some() {
991 plan_err!("Update-returning clause not yet supported")?;
992 }
993 if or.is_some() {
994 plan_err!("ON conflict not supported")?;
995 }
996 self.update_to_plan(table, assignments, update_from, selection)
997 }
998
999 Statement::Delete(Delete {
1000 tables,
1001 using,
1002 selection,
1003 returning,
1004 from,
1005 order_by,
1006 limit,
1007 }) => {
1008 if !tables.is_empty() {
1009 plan_err!("DELETE <TABLE> not supported")?;
1010 }
1011
1012 if using.is_some() {
1013 plan_err!("Using clause not supported")?;
1014 }
1015
1016 if returning.is_some() {
1017 plan_err!("Delete-returning clause not yet supported")?;
1018 }
1019
1020 if !order_by.is_empty() {
1021 plan_err!("Delete-order-by clause not yet supported")?;
1022 }
1023
1024 if limit.is_some() {
1025 plan_err!("Delete-limit clause not yet supported")?;
1026 }
1027
1028 let table_name = self.get_delete_target(from)?;
1029 self.delete_to_plan(table_name, selection)
1030 }
1031
1032 Statement::StartTransaction {
1033 modes,
1034 begin: false,
1035 modifier,
1036 transaction,
1037 statements,
1038 has_end_keyword,
1039 exception,
1040 } => {
1041 if let Some(modifier) = modifier {
1042 return not_impl_err!(
1043 "Transaction modifier not supported: {modifier}"
1044 );
1045 }
1046 if !statements.is_empty() {
1047 return not_impl_err!(
1048 "Transaction with multiple statements not supported"
1049 );
1050 }
1051 if exception.is_some() {
1052 return not_impl_err!(
1053 "Transaction with exception statements not supported"
1054 );
1055 }
1056 if has_end_keyword {
1057 return not_impl_err!("Transaction with END keyword not supported");
1058 }
1059 self.validate_transaction_kind(transaction)?;
1060 let isolation_level: ast::TransactionIsolationLevel = modes
1061 .iter()
1062 .filter_map(|m: &TransactionMode| match m {
1063 TransactionMode::AccessMode(_) => None,
1064 TransactionMode::IsolationLevel(level) => Some(level),
1065 })
1066 .next_back()
1067 .copied()
1068 .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1069 let access_mode: ast::TransactionAccessMode = modes
1070 .iter()
1071 .filter_map(|m: &TransactionMode| match m {
1072 TransactionMode::AccessMode(mode) => Some(mode),
1073 TransactionMode::IsolationLevel(_) => None,
1074 })
1075 .next_back()
1076 .copied()
1077 .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1078 let isolation_level = match isolation_level {
1079 ast::TransactionIsolationLevel::ReadUncommitted => {
1080 TransactionIsolationLevel::ReadUncommitted
1081 }
1082 ast::TransactionIsolationLevel::ReadCommitted => {
1083 TransactionIsolationLevel::ReadCommitted
1084 }
1085 ast::TransactionIsolationLevel::RepeatableRead => {
1086 TransactionIsolationLevel::RepeatableRead
1087 }
1088 ast::TransactionIsolationLevel::Serializable => {
1089 TransactionIsolationLevel::Serializable
1090 }
1091 ast::TransactionIsolationLevel::Snapshot => {
1092 TransactionIsolationLevel::Snapshot
1093 }
1094 };
1095 let access_mode = match access_mode {
1096 ast::TransactionAccessMode::ReadOnly => {
1097 TransactionAccessMode::ReadOnly
1098 }
1099 ast::TransactionAccessMode::ReadWrite => {
1100 TransactionAccessMode::ReadWrite
1101 }
1102 };
1103 let statement = PlanStatement::TransactionStart(TransactionStart {
1104 access_mode,
1105 isolation_level,
1106 });
1107 Ok(LogicalPlan::Statement(statement))
1108 }
1109 Statement::Commit {
1110 chain,
1111 end,
1112 modifier,
1113 } => {
1114 if end {
1115 return not_impl_err!("COMMIT AND END not supported");
1116 };
1117 if let Some(modifier) = modifier {
1118 return not_impl_err!("COMMIT {modifier} not supported");
1119 };
1120 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1121 conclusion: TransactionConclusion::Commit,
1122 chain,
1123 });
1124 Ok(LogicalPlan::Statement(statement))
1125 }
1126 Statement::Rollback { chain, savepoint } => {
1127 if savepoint.is_some() {
1128 plan_err!("Savepoints not supported")?;
1129 }
1130 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1131 conclusion: TransactionConclusion::Rollback,
1132 chain,
1133 });
1134 Ok(LogicalPlan::Statement(statement))
1135 }
1136 Statement::CreateFunction(ast::CreateFunction {
1137 or_replace,
1138 temporary,
1139 name,
1140 args,
1141 return_type,
1142 function_body,
1143 behavior,
1144 language,
1145 ..
1146 }) => {
1147 let return_type = match return_type {
1148 Some(t) => Some(self.convert_data_type(&t)?),
1149 None => None,
1150 };
1151 let mut planner_context = PlannerContext::new();
1152 let empty_schema = &DFSchema::empty();
1153
1154 let args = match args {
1155 Some(function_args) => {
1156 let function_args = function_args
1157 .into_iter()
1158 .map(|arg| {
1159 let data_type = self.convert_data_type(&arg.data_type)?;
1160
1161 let default_expr = match arg.default_expr {
1162 Some(expr) => Some(self.sql_to_expr(
1163 expr,
1164 empty_schema,
1165 &mut planner_context,
1166 )?),
1167 None => None,
1168 };
1169 Ok(OperateFunctionArg {
1170 name: arg.name,
1171 default_expr,
1172 data_type,
1173 })
1174 })
1175 .collect::<Result<Vec<OperateFunctionArg>>>();
1176 Some(function_args?)
1177 }
1178 None => None,
1179 };
1180 let name = match &name.0[..] {
1182 [] => exec_err!("Function should have name")?,
1183 [n] => n.as_ident().unwrap().value.clone(),
1184 [..] => not_impl_err!("Qualified functions are not supported")?,
1185 };
1186 let arg_types = args.as_ref().map(|arg| {
1190 arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
1191 });
1192 let mut planner_context = PlannerContext::new()
1193 .with_prepare_param_data_types(arg_types.unwrap_or_default());
1194
1195 let function_body = match function_body {
1196 Some(r) => Some(self.sql_to_expr(
1197 match r {
1198 ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1199 ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1200 ast::CreateFunctionBody::Return(expr) => expr,
1201 ast::CreateFunctionBody::AsBeginEnd(_) => {
1202 return not_impl_err!(
1203 "BEGIN/END enclosed function body syntax is not supported"
1204 )?;
1205 }
1206 ast::CreateFunctionBody::AsReturnExpr(_)
1207 | ast::CreateFunctionBody::AsReturnSelect(_) => {
1208 return not_impl_err!(
1209 "AS RETURN function syntax is not supported"
1210 )?
1211 }
1212 },
1213 &DFSchema::empty(),
1214 &mut planner_context,
1215 )?),
1216 None => None,
1217 };
1218
1219 let params = CreateFunctionBody {
1220 language,
1221 behavior: behavior.map(|b| match b {
1222 ast::FunctionBehavior::Immutable => Volatility::Immutable,
1223 ast::FunctionBehavior::Stable => Volatility::Stable,
1224 ast::FunctionBehavior::Volatile => Volatility::Volatile,
1225 }),
1226 function_body,
1227 };
1228
1229 let statement = DdlStatement::CreateFunction(CreateFunction {
1230 or_replace,
1231 temporary,
1232 name,
1233 return_type,
1234 args,
1235 params,
1236 schema: DFSchemaRef::new(DFSchema::empty()),
1237 });
1238
1239 Ok(LogicalPlan::Ddl(statement))
1240 }
1241 Statement::DropFunction {
1242 if_exists,
1243 func_desc,
1244 ..
1245 } => {
1246 if let Some(desc) = func_desc.first() {
1249 let name = match &desc.name.0[..] {
1251 [] => exec_err!("Function should have name")?,
1252 [n] => n.as_ident().unwrap().value.clone(),
1253 [..] => not_impl_err!("Qualified functions are not supported")?,
1254 };
1255 let statement = DdlStatement::DropFunction(DropFunction {
1256 if_exists,
1257 name,
1258 schema: DFSchemaRef::new(DFSchema::empty()),
1259 });
1260 Ok(LogicalPlan::Ddl(statement))
1261 } else {
1262 exec_err!("Function name not provided")
1263 }
1264 }
1265 Statement::CreateIndex(CreateIndex {
1266 name,
1267 table_name,
1268 using,
1269 columns,
1270 unique,
1271 if_not_exists,
1272 ..
1273 }) => {
1274 let name: Option<String> = name.as_ref().map(object_name_to_string);
1275 let table = self.object_name_to_table_reference(table_name)?;
1276 let table_schema = self
1277 .context_provider
1278 .get_table_source(table.clone())?
1279 .schema()
1280 .to_dfschema_ref()?;
1281 let using: Option<String> =
1282 using.as_ref().map(|index_type| match index_type {
1283 IndexType::Custom(ident) => ident_to_string(ident),
1284 _ => index_type.to_string().to_ascii_lowercase(),
1285 });
1286 let order_by_exprs: Vec<OrderByExpr> =
1287 columns.into_iter().map(|col| col.column).collect();
1288 let columns = self.order_by_to_sort_expr(
1289 order_by_exprs,
1290 &table_schema,
1291 planner_context,
1292 false,
1293 None,
1294 )?;
1295 Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1296 PlanCreateIndex {
1297 name,
1298 table,
1299 using,
1300 columns,
1301 unique,
1302 if_not_exists,
1303 schema: DFSchemaRef::new(DFSchema::empty()),
1304 },
1305 )))
1306 }
1307 stmt => {
1308 not_impl_err!("Unsupported SQL statement: {stmt}")
1309 }
1310 }
1311 }
1312
1313 fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1314 let mut from = match from {
1315 FromTable::WithFromKeyword(v) => v,
1316 FromTable::WithoutKeyword(v) => v,
1317 };
1318
1319 if from.len() != 1 {
1320 return not_impl_err!(
1321 "DELETE FROM only supports single table, got {}: {from:?}",
1322 from.len()
1323 );
1324 }
1325 let table_factor = from.pop().unwrap();
1326 if !table_factor.joins.is_empty() {
1327 return not_impl_err!("DELETE FROM only supports single table, got: joins");
1328 }
1329 let TableFactor::Table { name, .. } = table_factor.relation else {
1330 return not_impl_err!(
1331 "DELETE FROM only supports single table, got: {table_factor:?}"
1332 );
1333 };
1334
1335 Ok(name)
1336 }
1337
1338 fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1340 if self.has_table("information_schema", "tables") {
1341 let query = "SELECT * FROM information_schema.tables;";
1342 let mut rewrite = DFParser::parse_sql(query)?;
1343 assert_eq!(rewrite.len(), 1);
1344 self.statement_to_plan(rewrite.pop_front().unwrap()) } else {
1346 plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1347 }
1348 }
1349
1350 fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1351 let table_ref = self.object_name_to_table_reference(table_name)?;
1352
1353 let table_source = self.context_provider.get_table_source(table_ref)?;
1354
1355 let schema = table_source.schema();
1356
1357 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1358
1359 Ok(LogicalPlan::DescribeTable(DescribeTable {
1360 schema,
1361 output_schema: Arc::new(output_schema),
1362 }))
1363 }
1364
1365 fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1366 let copy_source = statement.source;
1368 let (input, input_schema, table_ref) = match copy_source {
1369 CopyToSource::Relation(object_name) => {
1370 let table_name = object_name_to_string(&object_name);
1371 let table_ref = self.object_name_to_table_reference(object_name)?;
1372 let table_source =
1373 self.context_provider.get_table_source(table_ref.clone())?;
1374 let plan =
1375 LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1376 let input_schema = Arc::clone(plan.schema());
1377 (plan, input_schema, Some(table_ref))
1378 }
1379 CopyToSource::Query(query) => {
1380 let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1381 let input_schema = Arc::clone(plan.schema());
1382 (plan, input_schema, None)
1383 }
1384 };
1385
1386 let options_map = self.parse_options_map(statement.options, true)?;
1387
1388 let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1389 self.context_provider.get_file_type(stored_as).ok()
1390 } else {
1391 None
1392 };
1393
1394 let file_type = match maybe_file_type {
1395 Some(ft) => ft,
1396 None => {
1397 let e = || {
1398 DataFusionError::Configuration(
1399 "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1400 .to_string(),
1401 )
1402 };
1403 let extension: &str = &Path::new(&statement.target)
1405 .extension()
1406 .ok_or_else(e)?
1407 .to_str()
1408 .ok_or_else(e)?
1409 .to_lowercase();
1410
1411 self.context_provider.get_file_type(extension)?
1412 }
1413 };
1414
1415 let partition_by = statement
1416 .partitioned_by
1417 .iter()
1418 .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1419 .collect::<Result<Vec<_>>>()?
1420 .into_iter()
1421 .map(|f| f.name().to_owned())
1422 .collect();
1423
1424 Ok(LogicalPlan::Copy(CopyTo::new(
1425 Arc::new(input),
1426 statement.target,
1427 partition_by,
1428 file_type,
1429 options_map,
1430 )))
1431 }
1432
1433 fn build_order_by(
1434 &self,
1435 order_exprs: Vec<LexOrdering>,
1436 schema: &DFSchemaRef,
1437 planner_context: &mut PlannerContext,
1438 ) -> Result<Vec<Vec<SortExpr>>> {
1439 if !order_exprs.is_empty() && schema.fields().is_empty() {
1440 let results = order_exprs
1441 .iter()
1442 .map(|lex_order| {
1443 let result = lex_order
1444 .iter()
1445 .map(|order_by_expr| {
1446 let ordered_expr = &order_by_expr.expr;
1447 let ordered_expr = ordered_expr.to_owned();
1448 let ordered_expr = self.sql_expr_to_logical_expr(
1449 ordered_expr,
1450 schema,
1451 planner_context,
1452 )?;
1453 let asc = order_by_expr.options.asc.unwrap_or(true);
1454 let nulls_first =
1455 order_by_expr.options.nulls_first.unwrap_or_else(|| {
1456 self.options.default_null_ordering.nulls_first(asc)
1457 });
1458
1459 Ok(SortExpr::new(ordered_expr, asc, nulls_first))
1460 })
1461 .collect::<Result<Vec<SortExpr>>>()?;
1462 Ok(result)
1463 })
1464 .collect::<Result<Vec<Vec<SortExpr>>>>()?;
1465
1466 return Ok(results);
1467 }
1468
1469 let mut all_results = vec![];
1470 for expr in order_exprs {
1471 let expr_vec =
1473 self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1474 for sort in expr_vec.iter() {
1476 for column in sort.expr.column_refs().iter() {
1477 if !schema.has_column(column) {
1478 return plan_err!("Column {column} is not in schema");
1480 }
1481 }
1482 }
1483 all_results.push(expr_vec)
1485 }
1486 Ok(all_results)
1487 }
1488
1489 fn external_table_to_plan(
1491 &self,
1492 statement: CreateExternalTable,
1493 ) -> Result<LogicalPlan> {
1494 let definition = Some(statement.to_string());
1495 let CreateExternalTable {
1496 name,
1497 columns,
1498 file_type,
1499 location,
1500 table_partition_cols,
1501 if_not_exists,
1502 temporary,
1503 order_exprs,
1504 unbounded,
1505 options,
1506 constraints,
1507 } = statement;
1508
1509 let mut all_constraints = constraints;
1511 let inline_constraints = calc_inline_constraints_from_columns(&columns);
1512 all_constraints.extend(inline_constraints);
1513
1514 let options_map = self.parse_options_map(options, false)?;
1515
1516 let compression = options_map
1517 .get("format.compression")
1518 .map(|c| CompressionTypeVariant::from_str(c))
1519 .transpose()?;
1520 if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1521 && compression
1522 .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1523 .unwrap_or(false)
1524 {
1525 plan_err!(
1526 "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1527 )?;
1528 }
1529
1530 let mut planner_context = PlannerContext::new();
1531
1532 let column_defaults = self
1533 .build_column_defaults(&columns, &mut planner_context)?
1534 .into_iter()
1535 .collect();
1536
1537 let schema = self.build_schema(columns)?;
1538 let df_schema = schema.to_dfschema_ref()?;
1539 df_schema.check_names()?;
1540
1541 let ordered_exprs =
1542 self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1543
1544 let name = self.object_name_to_table_reference(name)?;
1545 let constraints =
1546 self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1547 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1548 PlanCreateExternalTable {
1549 schema: df_schema,
1550 name,
1551 location,
1552 file_type,
1553 table_partition_cols,
1554 if_not_exists,
1555 temporary,
1556 definition,
1557 order_exprs: ordered_exprs,
1558 unbounded,
1559 options: options_map,
1560 constraints,
1561 column_defaults,
1562 },
1563 )))
1564 }
1565
1566 fn get_constraint_column_indices(
1569 &self,
1570 df_schema: &DFSchemaRef,
1571 columns: &[IndexColumn],
1572 constraint_name: &str,
1573 ) -> Result<Vec<usize>> {
1574 let field_names = df_schema.field_names();
1575 columns
1576 .iter()
1577 .map(|index_column| {
1578 let expr = &index_column.column.expr;
1579 let ident = if let SQLExpr::Identifier(ident) = expr {
1580 ident
1581 } else {
1582 return Err(plan_datafusion_err!(
1583 "Column name for {constraint_name} must be an identifier: {expr}"
1584 ));
1585 };
1586 let column = self.ident_normalizer.normalize(ident.clone());
1587 field_names
1588 .iter()
1589 .position(|item| *item == column)
1590 .ok_or_else(|| {
1591 plan_datafusion_err!(
1592 "Column for {constraint_name} not found in schema: {column}"
1593 )
1594 })
1595 })
1596 .collect::<Result<Vec<_>>>()
1597 }
1598
1599 pub fn new_constraint_from_table_constraints(
1601 &self,
1602 constraints: &[TableConstraint],
1603 df_schema: &DFSchemaRef,
1604 ) -> Result<Constraints> {
1605 let constraints = constraints
1606 .iter()
1607 .map(|c: &TableConstraint| match c {
1608 TableConstraint::Unique { name, columns, .. } => {
1609 let constraint_name = match name {
1610 Some(name) => &format!("unique constraint with name '{name}'"),
1611 None => "unique constraint",
1612 };
1613 let indices = self.get_constraint_column_indices(
1615 df_schema,
1616 columns,
1617 constraint_name,
1618 )?;
1619 Ok(Constraint::Unique(indices))
1620 }
1621 TableConstraint::PrimaryKey { columns, .. } => {
1622 let indices = self.get_constraint_column_indices(
1624 df_schema,
1625 columns,
1626 "primary key",
1627 )?;
1628 Ok(Constraint::PrimaryKey(indices))
1629 }
1630 TableConstraint::ForeignKey { .. } => {
1631 _plan_err!("Foreign key constraints are not currently supported")
1632 }
1633 TableConstraint::Check { .. } => {
1634 _plan_err!("Check constraints are not currently supported")
1635 }
1636 TableConstraint::Index { .. } => {
1637 _plan_err!("Indexes are not currently supported")
1638 }
1639 TableConstraint::FulltextOrSpatial { .. } => {
1640 _plan_err!("Indexes are not currently supported")
1641 }
1642 })
1643 .collect::<Result<Vec<_>>>()?;
1644 Ok(Constraints::new_unverified(constraints))
1645 }
1646
1647 fn parse_options_map(
1648 &self,
1649 options: Vec<(String, Value)>,
1650 allow_duplicates: bool,
1651 ) -> Result<HashMap<String, String>> {
1652 let mut options_map = HashMap::new();
1653 for (key, value) in options {
1654 if !allow_duplicates && options_map.contains_key(&key) {
1655 return plan_err!("Option {key} is specified multiple times");
1656 }
1657
1658 let Some(value_string) = crate::utils::value_to_string(&value) else {
1659 return plan_err!("Unsupported Value {}", value);
1660 };
1661
1662 if !(&key.contains('.')) {
1663 let renamed_key = format!("format.{key}");
1667 options_map.insert(renamed_key.to_lowercase(), value_string);
1668 } else {
1669 options_map.insert(key.to_lowercase(), value_string);
1670 }
1671 }
1672
1673 Ok(options_map)
1674 }
1675
1676 fn explain_to_plan(
1681 &self,
1682 verbose: bool,
1683 analyze: bool,
1684 format: Option<String>,
1685 statement: DFStatement,
1686 ) -> Result<LogicalPlan> {
1687 let plan = self.statement_to_plan(statement)?;
1688 if matches!(plan, LogicalPlan::Explain(_)) {
1689 return plan_err!("Nested EXPLAINs are not supported");
1690 }
1691
1692 let plan = Arc::new(plan);
1693 let schema = LogicalPlan::explain_schema();
1694 let schema = schema.to_dfschema_ref()?;
1695
1696 if verbose && format.is_some() {
1697 return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1698 }
1699
1700 if analyze {
1701 if format.is_some() {
1702 return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1703 }
1704 Ok(LogicalPlan::Analyze(Analyze {
1705 verbose,
1706 input: plan,
1707 schema,
1708 }))
1709 } else {
1710 let stringified_plans =
1711 vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1712
1713 let options = self.context_provider.options();
1715 let format = format.as_ref().unwrap_or(&options.explain.format);
1716
1717 let format: ExplainFormat = format.parse()?;
1718
1719 Ok(LogicalPlan::Explain(Explain {
1720 verbose,
1721 explain_format: format,
1722 plan,
1723 stringified_plans,
1724 schema,
1725 logical_optimization_succeeded: false,
1726 }))
1727 }
1728 }
1729
1730 fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1731 if !self.has_table("information_schema", "df_settings") {
1732 return plan_err!(
1733 "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1734 );
1735 }
1736
1737 let verbose = variable
1738 .last()
1739 .map(|s| ident_to_string(s) == "verbose")
1740 .unwrap_or(false);
1741 let mut variable_vec = variable.to_vec();
1742 let mut columns: String = "name, value".to_owned();
1743
1744 if verbose {
1745 columns = format!("{columns}, description");
1746 variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1747 }
1748
1749 let variable = object_name_to_string(&ObjectName::from(variable_vec));
1750 let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1751 let query = if variable == "all" {
1752 format!("{base_query} ORDER BY name")
1754 } else if variable == "timezone" || variable == "time.zone" {
1755 format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1757 } else {
1758 let is_valid_variable = self
1762 .context_provider
1763 .options()
1764 .entries()
1765 .iter()
1766 .any(|opt| opt.key == variable);
1767
1768 if !is_valid_variable {
1769 return plan_err!(
1770 "'{variable}' is not a variable which can be viewed with 'SHOW'"
1771 );
1772 }
1773
1774 format!("{base_query} WHERE name = '{variable}'")
1775 };
1776
1777 let mut rewrite = DFParser::parse_sql(&query)?;
1778 assert_eq!(rewrite.len(), 1);
1779
1780 self.statement_to_plan(rewrite.pop_front().unwrap())
1781 }
1782
1783 fn set_statement_to_plan(&self, statement: Set) -> Result<LogicalPlan> {
1784 match statement {
1785 Set::SingleAssignment {
1786 scope,
1787 hivevar,
1788 variable,
1789 values,
1790 } => {
1791 if scope.is_some() {
1792 return not_impl_err!("SET with scope modifiers is not supported");
1793 }
1794
1795 if hivevar {
1796 return not_impl_err!("SET HIVEVAR is not supported");
1797 }
1798
1799 let variable = object_name_to_string(&variable);
1800 let mut variable_lower = variable.to_lowercase();
1801
1802 if variable_lower == "timezone" || variable_lower == "time.zone" {
1803 variable_lower = "datafusion.execution.time_zone".to_string();
1804 }
1805
1806 if values.len() != 1 {
1807 return plan_err!("SET only supports single value assignment");
1808 }
1809
1810 let value_string = match &values[0] {
1811 SQLExpr::Identifier(i) => ident_to_string(i),
1812 SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
1813 None => {
1814 return plan_err!("Unsupported value {:?}", v.value);
1815 }
1816 Some(s) => s,
1817 },
1818 SQLExpr::UnaryOp { op, expr } => match op {
1819 UnaryOperator::Plus => format!("+{expr}"),
1820 UnaryOperator::Minus => format!("-{expr}"),
1821 _ => return plan_err!("Unsupported unary op {:?}", op),
1822 },
1823 _ => return plan_err!("Unsupported expr {:?}", values[0]),
1824 };
1825
1826 Ok(LogicalPlan::Statement(PlanStatement::SetVariable(
1827 SetVariable {
1828 variable: variable_lower,
1829 value: value_string,
1830 },
1831 )))
1832 }
1833 other => not_impl_err!("SET variant not implemented yet: {other:?}"),
1834 }
1835 }
1836
1837 fn delete_to_plan(
1838 &self,
1839 table_name: ObjectName,
1840 predicate_expr: Option<SQLExpr>,
1841 ) -> Result<LogicalPlan> {
1842 let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1844 let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1845 let schema = DFSchema::try_from_qualified_schema(
1846 table_ref.clone(),
1847 &table_source.schema(),
1848 )?;
1849 let scan =
1850 LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1851 .build()?;
1852 let mut planner_context = PlannerContext::new();
1853
1854 let source = match predicate_expr {
1855 None => scan,
1856 Some(predicate_expr) => {
1857 let filter_expr =
1858 self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1859 let schema = Arc::new(schema);
1860 let mut using_columns = HashSet::new();
1861 expr_to_columns(&filter_expr, &mut using_columns)?;
1862 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1863 filter_expr,
1864 &[&[&schema]],
1865 &[using_columns],
1866 )?;
1867 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1868 }
1869 };
1870
1871 let plan = LogicalPlan::Dml(DmlStatement::new(
1872 table_ref,
1873 table_source,
1874 WriteOp::Delete,
1875 Arc::new(source),
1876 ));
1877 Ok(plan)
1878 }
1879
1880 fn update_to_plan(
1881 &self,
1882 table: TableWithJoins,
1883 assignments: Vec<Assignment>,
1884 from: Option<TableWithJoins>,
1885 predicate_expr: Option<SQLExpr>,
1886 ) -> Result<LogicalPlan> {
1887 let (table_name, table_alias) = match &table.relation {
1888 TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
1889 _ => plan_err!("Cannot update non-table relation!")?,
1890 };
1891
1892 let table_name = self.object_name_to_table_reference(table_name)?;
1894 let table_source = self.context_provider.get_table_source(table_name.clone())?;
1895 let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
1896 table_name.clone(),
1897 &table_source.schema(),
1898 )?);
1899
1900 let mut planner_context = PlannerContext::new();
1902 let mut assign_map = assignments
1903 .iter()
1904 .map(|assign| {
1905 let cols = match &assign.target {
1906 AssignmentTarget::ColumnName(cols) => cols,
1907 _ => plan_err!("Tuples are not supported")?,
1908 };
1909 let col_name: &Ident = cols
1910 .0
1911 .iter()
1912 .last()
1913 .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
1914 .as_ident()
1915 .unwrap();
1916 table_schema.field_with_unqualified_name(&col_name.value)?;
1918 Ok((col_name.value.clone(), assign.value.clone()))
1919 })
1920 .collect::<Result<HashMap<String, SQLExpr>>>()?;
1921
1922 let mut input_tables = vec![table];
1924 input_tables.extend(from);
1925 let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
1926
1927 let source = match predicate_expr {
1929 None => scan,
1930 Some(predicate_expr) => {
1931 let filter_expr = self.sql_to_expr(
1932 predicate_expr,
1933 scan.schema(),
1934 &mut planner_context,
1935 )?;
1936 let mut using_columns = HashSet::new();
1937 expr_to_columns(&filter_expr, &mut using_columns)?;
1938 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1939 filter_expr,
1940 &[&[scan.schema()]],
1941 &[using_columns],
1942 )?;
1943 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1944 }
1945 };
1946
1947 let exprs = table_schema
1949 .iter()
1950 .map(|(qualifier, field)| {
1951 let expr = match assign_map.remove(field.name()) {
1952 Some(new_value) => {
1953 let mut expr = self.sql_to_expr(
1954 new_value,
1955 source.schema(),
1956 &mut planner_context,
1957 )?;
1958 if let Expr::Placeholder(placeholder) = &mut expr {
1960 placeholder.data_type = placeholder
1961 .data_type
1962 .take()
1963 .or_else(|| Some(field.data_type().clone()));
1964 }
1965 expr.cast_to(field.data_type(), source.schema())?
1967 }
1968 None => {
1969 if let Some(alias) = &table_alias {
1971 Expr::Column(Column::new(
1972 Some(self.ident_normalizer.normalize(alias.name.clone())),
1973 field.name(),
1974 ))
1975 } else {
1976 Expr::Column(Column::from((qualifier, field)))
1977 }
1978 }
1979 };
1980 Ok(expr.alias(field.name()))
1981 })
1982 .collect::<Result<Vec<_>>>()?;
1983
1984 let source = project(source, exprs)?;
1985
1986 let plan = LogicalPlan::Dml(DmlStatement::new(
1987 table_name,
1988 table_source,
1989 WriteOp::Update,
1990 Arc::new(source),
1991 ));
1992 Ok(plan)
1993 }
1994
1995 fn insert_to_plan(
1996 &self,
1997 table_name: ObjectName,
1998 columns: Vec<Ident>,
1999 source: Box<Query>,
2000 overwrite: bool,
2001 replace_into: bool,
2002 ) -> Result<LogicalPlan> {
2003 let table_name = self.object_name_to_table_reference(table_name)?;
2005 let table_source = self.context_provider.get_table_source(table_name.clone())?;
2006 let table_schema = DFSchema::try_from(table_source.schema())?;
2007
2008 let (fields, value_indices) = if columns.is_empty() {
2016 (
2018 table_schema.fields().clone(),
2019 (0..table_schema.fields().len())
2020 .map(Some)
2021 .collect::<Vec<_>>(),
2022 )
2023 } else {
2024 let mut value_indices = vec![None; table_schema.fields().len()];
2025 let fields = columns
2026 .into_iter()
2027 .map(|c| self.ident_normalizer.normalize(c))
2028 .enumerate()
2029 .map(|(i, c)| {
2030 let column_index = table_schema
2031 .index_of_column_by_name(None, &c)
2032 .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
2033
2034 if value_indices[column_index].is_some() {
2035 return schema_err!(SchemaError::DuplicateUnqualifiedField {
2036 name: c,
2037 });
2038 } else {
2039 value_indices[column_index] = Some(i);
2040 }
2041 Ok(table_schema.field(column_index).clone())
2042 })
2043 .collect::<Result<Vec<_>>>()?;
2044 (Fields::from(fields), value_indices)
2045 };
2046
2047 let mut prepare_param_data_types = BTreeMap::new();
2049 if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2050 for row in rows.iter() {
2051 for (idx, val) in row.iter().enumerate() {
2052 if let SQLExpr::Value(ValueWithSpan {
2053 value: Value::Placeholder(name),
2054 span: _,
2055 }) = val
2056 {
2057 let name =
2058 name.replace('$', "").parse::<usize>().map_err(|_| {
2059 plan_datafusion_err!("Can't parse placeholder: {name}")
2060 })? - 1;
2061 let field = fields.get(idx).ok_or_else(|| {
2062 plan_datafusion_err!(
2063 "Placeholder ${} refers to a non existent column",
2064 idx + 1
2065 )
2066 })?;
2067 let dt = field.data_type().clone();
2068 let _ = prepare_param_data_types.insert(name, dt);
2069 }
2070 }
2071 }
2072 }
2073 let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2074
2075 let mut planner_context =
2077 PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2078 planner_context.set_table_schema(Some(DFSchemaRef::new(
2079 DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2080 )));
2081 let source = self.query_to_plan(*source, &mut planner_context)?;
2082 if fields.len() != source.schema().fields().len() {
2083 plan_err!("Column count doesn't match insert query!")?;
2084 }
2085
2086 let exprs = value_indices
2087 .into_iter()
2088 .enumerate()
2089 .map(|(i, value_index)| {
2090 let target_field = table_schema.field(i);
2091 let expr = match value_index {
2092 Some(v) => {
2093 Expr::Column(Column::from(source.schema().qualified_field(v)))
2094 .cast_to(target_field.data_type(), source.schema())?
2095 }
2096 None => table_source
2098 .get_column_default(target_field.name())
2099 .cloned()
2100 .unwrap_or_else(|| {
2101 Expr::Literal(ScalarValue::Null, None)
2103 })
2104 .cast_to(target_field.data_type(), &DFSchema::empty())?,
2105 };
2106 Ok(expr.alias(target_field.name()))
2107 })
2108 .collect::<Result<Vec<Expr>>>()?;
2109 let source = project(source, exprs)?;
2110
2111 let insert_op = match (overwrite, replace_into) {
2112 (false, false) => InsertOp::Append,
2113 (true, false) => InsertOp::Overwrite,
2114 (false, true) => InsertOp::Replace,
2115 (true, true) => plan_err!("Conflicting insert operations: `overwrite` and `replace_into` cannot both be true")?,
2116 };
2117
2118 let plan = LogicalPlan::Dml(DmlStatement::new(
2119 table_name,
2120 Arc::clone(&table_source),
2121 WriteOp::Insert(insert_op),
2122 Arc::new(source),
2123 ));
2124 Ok(plan)
2125 }
2126
2127 fn show_columns_to_plan(
2128 &self,
2129 extended: bool,
2130 full: bool,
2131 sql_table_name: ObjectName,
2132 ) -> Result<LogicalPlan> {
2133 let where_clause = object_name_to_qualifier(
2135 &sql_table_name,
2136 self.options.enable_ident_normalization,
2137 )?;
2138
2139 if !self.has_table("information_schema", "columns") {
2140 return plan_err!(
2141 "SHOW COLUMNS is not supported unless information_schema is enabled"
2142 );
2143 }
2144
2145 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2147 let _ = self.context_provider.get_table_source(table_ref)?;
2148
2149 let select_list = if full || extended {
2151 "*"
2152 } else {
2153 "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2154 };
2155
2156 let query = format!(
2157 "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2158 );
2159
2160 let mut rewrite = DFParser::parse_sql(&query)?;
2161 assert_eq!(rewrite.len(), 1);
2162 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2164
2165 fn show_functions_to_plan(
2176 &self,
2177 filter: Option<ShowStatementFilter>,
2178 ) -> Result<LogicalPlan> {
2179 let where_clause = if let Some(filter) = filter {
2180 match filter {
2181 ShowStatementFilter::Like(like) => {
2182 format!("WHERE p.function_name like '{like}'")
2183 }
2184 _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2185 }
2186 } else {
2187 "".to_string()
2188 };
2189
2190 let query = format!(
2191 r#"
2192SELECT DISTINCT
2193 p.*,
2194 r.function_type function_type,
2195 r.description description,
2196 r.syntax_example syntax_example
2197FROM
2198 (
2199 SELECT
2200 i.specific_name function_name,
2201 o.data_type return_type,
2202 array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2203 array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2204 FROM (
2205 SELECT
2206 specific_catalog,
2207 specific_schema,
2208 specific_name,
2209 ordinal_position,
2210 parameter_name,
2211 data_type,
2212 rid
2213 FROM
2214 information_schema.parameters
2215 WHERE
2216 parameter_mode = 'IN'
2217 ) i
2218 JOIN
2219 (
2220 SELECT
2221 specific_catalog,
2222 specific_schema,
2223 specific_name,
2224 ordinal_position,
2225 parameter_name,
2226 data_type,
2227 rid
2228 FROM
2229 information_schema.parameters
2230 WHERE
2231 parameter_mode = 'OUT'
2232 ) o
2233 ON i.specific_catalog = o.specific_catalog
2234 AND i.specific_schema = o.specific_schema
2235 AND i.specific_name = o.specific_name
2236 AND i.rid = o.rid
2237 GROUP BY 1, 2, i.rid
2238 ) as p
2239JOIN information_schema.routines r
2240ON p.function_name = r.routine_name
2241{where_clause}
2242 "#
2243 );
2244 let mut rewrite = DFParser::parse_sql(&query)?;
2245 assert_eq!(rewrite.len(), 1);
2246 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2248
2249 fn show_create_table_to_plan(
2250 &self,
2251 sql_table_name: ObjectName,
2252 ) -> Result<LogicalPlan> {
2253 if !self.has_table("information_schema", "tables") {
2254 return plan_err!(
2255 "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2256 );
2257 }
2258 let where_clause = object_name_to_qualifier(
2260 &sql_table_name,
2261 self.options.enable_ident_normalization,
2262 )?;
2263
2264 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2266 let _ = self.context_provider.get_table_source(table_ref)?;
2267
2268 let query = format!(
2269 "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2270 );
2271
2272 let mut rewrite = DFParser::parse_sql(&query)?;
2273 assert_eq!(rewrite.len(), 1);
2274 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2276
2277 fn has_table(&self, schema: &str, table: &str) -> bool {
2279 let tables_reference = TableReference::Partial {
2280 schema: schema.into(),
2281 table: table.into(),
2282 };
2283 self.context_provider
2284 .get_table_source(tables_reference)
2285 .is_ok()
2286 }
2287
2288 fn validate_transaction_kind(
2289 &self,
2290 kind: Option<BeginTransactionKind>,
2291 ) -> Result<()> {
2292 match kind {
2293 None => Ok(()),
2295 Some(BeginTransactionKind::Transaction) => Ok(()),
2297 Some(BeginTransactionKind::Work) => {
2298 not_impl_err!("Transaction kind not supported: {kind:?}")
2299 }
2300 }
2301 }
2302}