1use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24 CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25 LexOrdering, Statement as DFStatement,
26};
27use crate::planner::{
28 object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{DataType, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36 exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
37 unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
38 DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
39 ToDFSchema,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::builder::project;
44use datafusion_expr::logical_plan::DdlStatement;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47 cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
48 CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49 CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50 DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51 EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52 LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable,
53 SortExpr, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
54 TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
55 Volatility, WriteOp,
56};
57use sqlparser::ast::{
58 self, BeginTransactionKind, NullsDistinctOption, ShowStatementIn,
59 ShowStatementOptions, SqliteOnConflict, TableObject, UpdateTableFromKind,
60 ValueWithSpan,
61};
62use sqlparser::ast::{
63 Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
64 CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
65 ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr,
66 ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor,
67 TableWithJoins, TransactionMode, UnaryOperator, Value,
68};
69use sqlparser::parser::ParserError::ParserError;
70
71fn ident_to_string(ident: &Ident) -> String {
72 normalize_ident(ident.to_owned())
73}
74
75fn object_name_to_string(object_name: &ObjectName) -> String {
76 object_name
77 .0
78 .iter()
79 .map(|object_name_part| {
80 object_name_part
81 .as_ident()
82 .map_or_else(String::new, ident_to_string)
85 })
86 .collect::<Vec<String>>()
87 .join(".")
88}
89
90fn get_schema_name(schema_name: &SchemaName) -> String {
91 match schema_name {
92 SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
93 SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
94 SchemaName::NamedAuthorization(schema_name, auth) => format!(
95 "{}.{}",
96 object_name_to_string(schema_name),
97 ident_to_string(auth)
98 ),
99 }
100}
101
102fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
105 let mut constraints = vec![];
106 for column in columns {
107 for ast::ColumnOptionDef { name, option } in &column.options {
108 match option {
109 ast::ColumnOption::Unique {
110 is_primary: false,
111 characteristics,
112 } => constraints.push(TableConstraint::Unique {
113 name: name.clone(),
114 columns: vec![column.name.clone()],
115 characteristics: *characteristics,
116 index_name: None,
117 index_type_display: ast::KeyOrIndexDisplay::None,
118 index_type: None,
119 index_options: vec![],
120 nulls_distinct: NullsDistinctOption::None,
121 }),
122 ast::ColumnOption::Unique {
123 is_primary: true,
124 characteristics,
125 } => constraints.push(TableConstraint::PrimaryKey {
126 name: name.clone(),
127 columns: vec![column.name.clone()],
128 characteristics: *characteristics,
129 index_name: None,
130 index_type: None,
131 index_options: vec![],
132 }),
133 ast::ColumnOption::ForeignKey {
134 foreign_table,
135 referred_columns,
136 on_delete,
137 on_update,
138 characteristics,
139 } => constraints.push(TableConstraint::ForeignKey {
140 name: name.clone(),
141 columns: vec![],
142 foreign_table: foreign_table.clone(),
143 referred_columns: referred_columns.to_vec(),
144 on_delete: *on_delete,
145 on_update: *on_update,
146 characteristics: *characteristics,
147 }),
148 ast::ColumnOption::Check(expr) => {
149 constraints.push(TableConstraint::Check {
150 name: name.clone(),
151 expr: Box::new(expr.clone()),
152 })
153 }
154 ast::ColumnOption::Default(_)
156 | ast::ColumnOption::Null
157 | ast::ColumnOption::NotNull
158 | ast::ColumnOption::DialectSpecific(_)
159 | ast::ColumnOption::CharacterSet(_)
160 | ast::ColumnOption::Generated { .. }
161 | ast::ColumnOption::Comment(_)
162 | ast::ColumnOption::Options(_)
163 | ast::ColumnOption::OnUpdate(_)
164 | ast::ColumnOption::Materialized(_)
165 | ast::ColumnOption::Ephemeral(_)
166 | ast::ColumnOption::Identity(_)
167 | ast::ColumnOption::OnConflict(_)
168 | ast::ColumnOption::Policy(_)
169 | ast::ColumnOption::Tags(_)
170 | ast::ColumnOption::Alias(_)
171 | ast::ColumnOption::Collation(_) => {}
172 }
173 }
174 }
175 constraints
176}
177
178impl<S: ContextProvider> SqlToRel<'_, S> {
179 pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
181 match statement {
182 DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
183 DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
184 DFStatement::CopyTo(s) => self.copy_to_plan(s),
185 DFStatement::Explain(ExplainStatement {
186 verbose,
187 analyze,
188 format,
189 statement,
190 }) => self.explain_to_plan(verbose, analyze, format, *statement),
191 }
192 }
193
194 pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
196 self.sql_statement_to_plan_with_context_impl(
197 statement,
198 &mut PlannerContext::new(),
199 )
200 }
201
202 pub fn sql_statement_to_plan_with_context(
204 &self,
205 statement: Statement,
206 planner_context: &mut PlannerContext,
207 ) -> Result<LogicalPlan> {
208 self.sql_statement_to_plan_with_context_impl(statement, planner_context)
209 }
210
211 fn sql_statement_to_plan_with_context_impl(
212 &self,
213 statement: Statement,
214 planner_context: &mut PlannerContext,
215 ) -> Result<LogicalPlan> {
216 match statement {
217 Statement::ExplainTable {
218 describe_alias: DescribeAlias::Describe, table_name,
220 ..
221 } => self.describe_table_to_plan(table_name),
222 Statement::Explain {
223 verbose,
224 statement,
225 analyze,
226 format,
227 describe_alias: _,
228 ..
229 } => {
230 let format = format.map(|format| format.to_string());
231 let statement = DFStatement::Statement(statement);
232 self.explain_to_plan(verbose, analyze, format, statement)
233 }
234 Statement::Query(query) => self.query_to_plan(*query, planner_context),
235 Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
236 Statement::SetVariable {
237 local,
238 hivevar,
239 variables,
240 value,
241 } => self.set_variable_to_plan(local, hivevar, &variables, value),
242
243 Statement::CreateTable(CreateTable {
244 temporary,
245 external,
246 global,
247 transient,
248 volatile,
249 hive_distribution,
250 hive_formats,
251 file_format,
252 location,
253 query,
254 name,
255 columns,
256 constraints,
257 table_properties,
258 with_options,
259 if_not_exists,
260 or_replace,
261 without_rowid,
262 like,
263 clone,
264 engine,
265 comment,
266 auto_increment_offset,
267 default_charset,
268 collation,
269 on_commit,
270 on_cluster,
271 primary_key,
272 order_by,
273 partition_by,
274 cluster_by,
275 clustered_by,
276 options,
277 strict,
278 copy_grants,
279 enable_schema_evolution,
280 change_tracking,
281 data_retention_time_in_days,
282 max_data_extension_time_in_days,
283 default_ddl_collation,
284 with_aggregation_policy,
285 with_row_access_policy,
286 with_tags,
287 iceberg,
288 external_volume,
289 base_location,
290 catalog,
291 catalog_sync,
292 storage_serialization_policy,
293 }) if table_properties.is_empty() && with_options.is_empty() => {
294 if temporary {
295 return not_impl_err!("Temporary tables not supported")?;
296 }
297 if external {
298 return not_impl_err!("External tables not supported")?;
299 }
300 if global.is_some() {
301 return not_impl_err!("Global tables not supported")?;
302 }
303 if transient {
304 return not_impl_err!("Transient tables not supported")?;
305 }
306 if volatile {
307 return not_impl_err!("Volatile tables not supported")?;
308 }
309 if hive_distribution != ast::HiveDistributionStyle::NONE {
310 return not_impl_err!(
311 "Hive distribution not supported: {hive_distribution:?}"
312 )?;
313 }
314 if !matches!(
315 hive_formats,
316 Some(ast::HiveFormat {
317 row_format: None,
318 serde_properties: None,
319 storage: None,
320 location: None,
321 })
322 ) {
323 return not_impl_err!(
324 "Hive formats not supported: {hive_formats:?}"
325 )?;
326 }
327 if file_format.is_some() {
328 return not_impl_err!("File format not supported")?;
329 }
330 if location.is_some() {
331 return not_impl_err!("Location not supported")?;
332 }
333 if without_rowid {
334 return not_impl_err!("Without rowid not supported")?;
335 }
336 if like.is_some() {
337 return not_impl_err!("Like not supported")?;
338 }
339 if clone.is_some() {
340 return not_impl_err!("Clone not supported")?;
341 }
342 if engine.is_some() {
343 return not_impl_err!("Engine not supported")?;
344 }
345 if comment.is_some() {
346 return not_impl_err!("Comment not supported")?;
347 }
348 if auto_increment_offset.is_some() {
349 return not_impl_err!("Auto increment offset not supported")?;
350 }
351 if default_charset.is_some() {
352 return not_impl_err!("Default charset not supported")?;
353 }
354 if collation.is_some() {
355 return not_impl_err!("Collation not supported")?;
356 }
357 if on_commit.is_some() {
358 return not_impl_err!("On commit not supported")?;
359 }
360 if on_cluster.is_some() {
361 return not_impl_err!("On cluster not supported")?;
362 }
363 if primary_key.is_some() {
364 return not_impl_err!("Primary key not supported")?;
365 }
366 if order_by.is_some() {
367 return not_impl_err!("Order by not supported")?;
368 }
369 if partition_by.is_some() {
370 return not_impl_err!("Partition by not supported")?;
371 }
372 if cluster_by.is_some() {
373 return not_impl_err!("Cluster by not supported")?;
374 }
375 if clustered_by.is_some() {
376 return not_impl_err!("Clustered by not supported")?;
377 }
378 if options.is_some() {
379 return not_impl_err!("Options not supported")?;
380 }
381 if strict {
382 return not_impl_err!("Strict not supported")?;
383 }
384 if copy_grants {
385 return not_impl_err!("Copy grants not supported")?;
386 }
387 if enable_schema_evolution.is_some() {
388 return not_impl_err!("Enable schema evolution not supported")?;
389 }
390 if change_tracking.is_some() {
391 return not_impl_err!("Change tracking not supported")?;
392 }
393 if data_retention_time_in_days.is_some() {
394 return not_impl_err!("Data retention time in days not supported")?;
395 }
396 if max_data_extension_time_in_days.is_some() {
397 return not_impl_err!(
398 "Max data extension time in days not supported"
399 )?;
400 }
401 if default_ddl_collation.is_some() {
402 return not_impl_err!("Default DDL collation not supported")?;
403 }
404 if with_aggregation_policy.is_some() {
405 return not_impl_err!("With aggregation policy not supported")?;
406 }
407 if with_row_access_policy.is_some() {
408 return not_impl_err!("With row access policy not supported")?;
409 }
410 if with_tags.is_some() {
411 return not_impl_err!("With tags not supported")?;
412 }
413 if iceberg {
414 return not_impl_err!("Iceberg not supported")?;
415 }
416 if external_volume.is_some() {
417 return not_impl_err!("External volume not supported")?;
418 }
419 if base_location.is_some() {
420 return not_impl_err!("Base location not supported")?;
421 }
422 if catalog.is_some() {
423 return not_impl_err!("Catalog not supported")?;
424 }
425 if catalog_sync.is_some() {
426 return not_impl_err!("Catalog sync not supported")?;
427 }
428 if storage_serialization_policy.is_some() {
429 return not_impl_err!("Storage serialization policy not supported")?;
430 }
431
432 let mut all_constraints = constraints;
434 let inline_constraints = calc_inline_constraints_from_columns(&columns);
435 all_constraints.extend(inline_constraints);
436 let column_defaults =
438 self.build_column_defaults(&columns, planner_context)?;
439
440 let has_columns = !columns.is_empty();
441 let schema = self.build_schema(columns)?.to_dfschema_ref()?;
442 if has_columns {
443 planner_context.set_table_schema(Some(Arc::clone(&schema)));
444 }
445
446 match query {
447 Some(query) => {
448 let plan = self.query_to_plan(*query, planner_context)?;
449 let input_schema = plan.schema();
450
451 let plan = if has_columns {
452 if schema.fields().len() != input_schema.fields().len() {
453 return plan_err!(
454 "Mismatch: {} columns specified, but result has {} columns",
455 schema.fields().len(),
456 input_schema.fields().len()
457 );
458 }
459 let input_fields = input_schema.fields();
460 let project_exprs = schema
461 .fields()
462 .iter()
463 .zip(input_fields)
464 .map(|(field, input_field)| {
465 cast(
466 col(input_field.name()),
467 field.data_type().clone(),
468 )
469 .alias(field.name())
470 })
471 .collect::<Vec<_>>();
472
473 LogicalPlanBuilder::from(plan.clone())
474 .project(project_exprs)?
475 .build()?
476 } else {
477 plan
478 };
479
480 let constraints = self.new_constraint_from_table_constraints(
481 &all_constraints,
482 plan.schema(),
483 )?;
484
485 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
486 CreateMemoryTable {
487 name: self.object_name_to_table_reference(name)?,
488 constraints,
489 input: Arc::new(plan),
490 if_not_exists,
491 or_replace,
492 column_defaults,
493 temporary,
494 },
495 )))
496 }
497
498 None => {
499 let plan = EmptyRelation {
500 produce_one_row: false,
501 schema,
502 };
503 let plan = LogicalPlan::EmptyRelation(plan);
504 let constraints = self.new_constraint_from_table_constraints(
505 &all_constraints,
506 plan.schema(),
507 )?;
508 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
509 CreateMemoryTable {
510 name: self.object_name_to_table_reference(name)?,
511 constraints,
512 input: Arc::new(plan),
513 if_not_exists,
514 or_replace,
515 column_defaults,
516 temporary,
517 },
518 )))
519 }
520 }
521 }
522
523 Statement::CreateView {
524 or_replace,
525 materialized,
526 name,
527 columns,
528 query,
529 options: CreateTableOptions::None,
530 cluster_by,
531 comment,
532 with_no_schema_binding,
533 if_not_exists,
534 temporary,
535 to,
536 params,
537 } => {
538 if materialized {
539 return not_impl_err!("Materialized views not supported")?;
540 }
541 if !cluster_by.is_empty() {
542 return not_impl_err!("Cluster by not supported")?;
543 }
544 if comment.is_some() {
545 return not_impl_err!("Comment not supported")?;
546 }
547 if with_no_schema_binding {
548 return not_impl_err!("With no schema binding not supported")?;
549 }
550 if if_not_exists {
551 return not_impl_err!("If not exists not supported")?;
552 }
553 if to.is_some() {
554 return not_impl_err!("To not supported")?;
555 }
556
557 let stmt = Statement::CreateView {
560 or_replace,
561 materialized,
562 name,
563 columns,
564 query,
565 options: CreateTableOptions::None,
566 cluster_by,
567 comment,
568 with_no_schema_binding,
569 if_not_exists,
570 temporary,
571 to,
572 params,
573 };
574 let sql = stmt.to_string();
575 let Statement::CreateView {
576 name,
577 columns,
578 query,
579 or_replace,
580 temporary,
581 ..
582 } = stmt
583 else {
584 return internal_err!("Unreachable code in create view");
585 };
586
587 let columns = columns
588 .into_iter()
589 .map(|view_column_def| {
590 if let Some(options) = view_column_def.options {
591 plan_err!(
592 "Options not supported for view columns: {options:?}"
593 )
594 } else {
595 Ok(view_column_def.name)
596 }
597 })
598 .collect::<Result<Vec<_>>>()?;
599
600 let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
601 plan = self.apply_expr_alias(plan, columns)?;
602
603 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
604 name: self.object_name_to_table_reference(name)?,
605 input: Arc::new(plan),
606 or_replace,
607 definition: Some(sql),
608 temporary,
609 })))
610 }
611 Statement::ShowCreate { obj_type, obj_name } => match obj_type {
612 ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
613 _ => {
614 not_impl_err!("Only `SHOW CREATE TABLE ...` statement is supported")
615 }
616 },
617 Statement::CreateSchema {
618 schema_name,
619 if_not_exists,
620 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
621 CreateCatalogSchema {
622 schema_name: get_schema_name(&schema_name),
623 if_not_exists,
624 schema: Arc::new(DFSchema::empty()),
625 },
626 ))),
627 Statement::CreateDatabase {
628 db_name,
629 if_not_exists,
630 ..
631 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
632 CreateCatalog {
633 catalog_name: object_name_to_string(&db_name),
634 if_not_exists,
635 schema: Arc::new(DFSchema::empty()),
636 },
637 ))),
638 Statement::Drop {
639 object_type,
640 if_exists,
641 mut names,
642 cascade,
643 restrict: _,
644 purge: _,
645 temporary: _,
646 } => {
647 let name = match names.len() {
650 0 => Err(ParserError("Missing table name.".to_string()).into()),
651 1 => self.object_name_to_table_reference(names.pop().unwrap()),
652 _ => {
653 Err(ParserError("Multiple objects not supported".to_string())
654 .into())
655 }
656 }?;
657
658 match object_type {
659 ObjectType::Table => {
660 Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
661 name,
662 if_exists,
663 schema: DFSchemaRef::new(DFSchema::empty()),
664 })))
665 }
666 ObjectType::View => {
667 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
668 name,
669 if_exists,
670 schema: DFSchemaRef::new(DFSchema::empty()),
671 })))
672 }
673 ObjectType::Schema => {
674 let name = match name {
675 TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
676 TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
677 TableReference::Full { catalog: _, schema: _, table: _ } => {
678 Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
679 }
680 }?;
681 Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
682 name,
683 if_exists,
684 cascade,
685 schema: DFSchemaRef::new(DFSchema::empty()),
686 })))
687 }
688 _ => not_impl_err!(
689 "Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
690 ),
691 }
692 }
693 Statement::Prepare {
694 name,
695 data_types,
696 statement,
697 } => {
698 let mut data_types: Vec<DataType> = data_types
700 .into_iter()
701 .map(|t| self.convert_data_type(&t))
702 .collect::<Result<_>>()?;
703
704 let mut planner_context = PlannerContext::new()
706 .with_prepare_param_data_types(data_types.clone());
707
708 let plan = self.sql_statement_to_plan_with_context_impl(
710 *statement,
711 &mut planner_context,
712 )?;
713
714 if data_types.is_empty() {
715 let map_types = plan.get_parameter_types()?;
716 let param_types: Vec<_> = (1..=map_types.len())
717 .filter_map(|i| {
718 let key = format!("${i}");
719 map_types.get(&key).and_then(|opt| opt.clone())
720 })
721 .collect();
722 data_types.extend(param_types.iter().cloned());
723 planner_context.with_prepare_param_data_types(param_types);
724 }
725
726 Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
727 name: ident_to_string(&name),
728 data_types,
729 input: Arc::new(plan),
730 })))
731 }
732 Statement::Execute {
733 name,
734 parameters,
735 using,
736 has_parentheses: _,
739 immediate,
740 into,
741 } => {
742 if !using.is_empty() {
744 return not_impl_err!(
745 "Execute statement with USING is not supported"
746 );
747 }
748 if immediate {
749 return not_impl_err!(
750 "Execute statement with IMMEDIATE is not supported"
751 );
752 }
753 if !into.is_empty() {
754 return not_impl_err!("Execute statement with INTO is not supported");
755 }
756 let empty_schema = DFSchema::empty();
757 let parameters = parameters
758 .into_iter()
759 .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
760 .collect::<Result<Vec<Expr>>>()?;
761
762 Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
763 name: object_name_to_string(&name.unwrap()),
764 parameters,
765 })))
766 }
767 Statement::Deallocate {
768 name,
769 prepare: _,
771 } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
772 Deallocate {
773 name: ident_to_string(&name),
774 },
775 ))),
776
777 Statement::ShowTables {
778 extended,
779 full,
780 terse,
781 history,
782 external,
783 show_options,
784 } => {
785 if extended {
788 return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
789 }
790 if full {
791 return not_impl_err!("SHOW FULL TABLES not supported")?;
792 }
793 if terse {
794 return not_impl_err!("SHOW TERSE TABLES not supported")?;
795 }
796 if history {
797 return not_impl_err!("SHOW TABLES HISTORY not supported")?;
798 }
799 if external {
800 return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
801 }
802 let ShowStatementOptions {
803 show_in,
804 starts_with,
805 limit,
806 limit_from,
807 filter_position,
808 } = show_options;
809 if show_in.is_some() {
810 return not_impl_err!("SHOW TABLES IN not supported")?;
811 }
812 if starts_with.is_some() {
813 return not_impl_err!("SHOW TABLES LIKE not supported")?;
814 }
815 if limit.is_some() {
816 return not_impl_err!("SHOW TABLES LIMIT not supported")?;
817 }
818 if limit_from.is_some() {
819 return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
820 }
821 if filter_position.is_some() {
822 return not_impl_err!("SHOW TABLES FILTER not supported")?;
823 }
824 self.show_tables_to_plan()
825 }
826
827 Statement::ShowColumns {
828 extended,
829 full,
830 show_options,
831 } => {
832 let ShowStatementOptions {
833 show_in,
834 starts_with,
835 limit,
836 limit_from,
837 filter_position,
838 } = show_options;
839 if starts_with.is_some() {
840 return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
841 }
842 if limit.is_some() {
843 return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
844 }
845 if limit_from.is_some() {
846 return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
847 }
848 if filter_position.is_some() {
849 return not_impl_err!(
850 "SHOW COLUMNS with WHERE or LIKE is not supported"
851 )?;
852 }
853 let Some(ShowStatementIn {
854 clause: _,
857 parent_type,
858 parent_name,
859 }) = show_in
860 else {
861 return plan_err!("SHOW COLUMNS requires a table name");
862 };
863
864 if let Some(parent_type) = parent_type {
865 return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
866 }
867 let Some(table_name) = parent_name else {
868 return plan_err!("SHOW COLUMNS requires a table name");
869 };
870
871 self.show_columns_to_plan(extended, full, table_name)
872 }
873
874 Statement::ShowFunctions { filter, .. } => {
875 self.show_functions_to_plan(filter)
876 }
877
878 Statement::Insert(Insert {
879 or,
880 into,
881 columns,
882 overwrite,
883 source,
884 partitioned,
885 after_columns,
886 table,
887 on,
888 returning,
889 ignore,
890 table_alias,
891 mut replace_into,
892 priority,
893 insert_alias,
894 assignments,
895 has_table_keyword,
896 settings,
897 format_clause,
898 }) => {
899 let table_name = match table {
900 TableObject::TableName(table_name) => table_name,
901 TableObject::TableFunction(_) => {
902 return not_impl_err!("INSERT INTO Table functions not supported")
903 }
904 };
905 if let Some(or) = or {
906 match or {
907 SqliteOnConflict::Replace => replace_into = true,
908 _ => plan_err!("Inserts with {or} clause is not supported")?,
909 }
910 }
911 if partitioned.is_some() {
912 plan_err!("Partitioned inserts not yet supported")?;
913 }
914 if !after_columns.is_empty() {
915 plan_err!("After-columns clause not supported")?;
916 }
917 if on.is_some() {
918 plan_err!("Insert-on clause not supported")?;
919 }
920 if returning.is_some() {
921 plan_err!("Insert-returning clause not supported")?;
922 }
923 if ignore {
924 plan_err!("Insert-ignore clause not supported")?;
925 }
926 let Some(source) = source else {
927 plan_err!("Inserts without a source not supported")?
928 };
929 if let Some(table_alias) = table_alias {
930 plan_err!(
931 "Inserts with a table alias not supported: {table_alias:?}"
932 )?
933 };
934 if let Some(priority) = priority {
935 plan_err!(
936 "Inserts with a `PRIORITY` clause not supported: {priority:?}"
937 )?
938 };
939 if insert_alias.is_some() {
940 plan_err!("Inserts with an alias not supported")?;
941 }
942 if !assignments.is_empty() {
943 plan_err!("Inserts with assignments not supported")?;
944 }
945 if settings.is_some() {
946 plan_err!("Inserts with settings not supported")?;
947 }
948 if format_clause.is_some() {
949 plan_err!("Inserts with format clause not supported")?;
950 }
951 let _ = into;
953 let _ = has_table_keyword;
954 self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
955 }
956 Statement::Update {
957 table,
958 assignments,
959 from,
960 selection,
961 returning,
962 or,
963 } => {
964 let froms =
965 from.map(|update_table_from_kind| match update_table_from_kind {
966 UpdateTableFromKind::BeforeSet(froms) => froms,
967 UpdateTableFromKind::AfterSet(froms) => froms,
968 });
969 if froms.as_ref().is_some_and(|f| f.len() > 1) {
971 plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
972 }
973 let update_from = froms.and_then(|mut f| f.pop());
974 if returning.is_some() {
975 plan_err!("Update-returning clause not yet supported")?;
976 }
977 if or.is_some() {
978 plan_err!("ON conflict not supported")?;
979 }
980 self.update_to_plan(table, assignments, update_from, selection)
981 }
982
983 Statement::Delete(Delete {
984 tables,
985 using,
986 selection,
987 returning,
988 from,
989 order_by,
990 limit,
991 }) => {
992 if !tables.is_empty() {
993 plan_err!("DELETE <TABLE> not supported")?;
994 }
995
996 if using.is_some() {
997 plan_err!("Using clause not supported")?;
998 }
999
1000 if returning.is_some() {
1001 plan_err!("Delete-returning clause not yet supported")?;
1002 }
1003
1004 if !order_by.is_empty() {
1005 plan_err!("Delete-order-by clause not yet supported")?;
1006 }
1007
1008 if limit.is_some() {
1009 plan_err!("Delete-limit clause not yet supported")?;
1010 }
1011
1012 let table_name = self.get_delete_target(from)?;
1013 self.delete_to_plan(table_name, selection)
1014 }
1015
1016 Statement::StartTransaction {
1017 modes,
1018 begin: false,
1019 modifier,
1020 transaction,
1021 statements,
1022 exception_statements,
1023 has_end_keyword,
1024 } => {
1025 if let Some(modifier) = modifier {
1026 return not_impl_err!(
1027 "Transaction modifier not supported: {modifier}"
1028 );
1029 }
1030 if !statements.is_empty() {
1031 return not_impl_err!(
1032 "Transaction with multiple statements not supported"
1033 );
1034 }
1035 if exception_statements.is_some() {
1036 return not_impl_err!(
1037 "Transaction with exception statements not supported"
1038 );
1039 }
1040 if has_end_keyword {
1041 return not_impl_err!("Transaction with END keyword not supported");
1042 }
1043 self.validate_transaction_kind(transaction)?;
1044 let isolation_level: ast::TransactionIsolationLevel = modes
1045 .iter()
1046 .filter_map(|m: &TransactionMode| match m {
1047 TransactionMode::AccessMode(_) => None,
1048 TransactionMode::IsolationLevel(level) => Some(level),
1049 })
1050 .next_back()
1051 .copied()
1052 .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1053 let access_mode: ast::TransactionAccessMode = modes
1054 .iter()
1055 .filter_map(|m: &TransactionMode| match m {
1056 TransactionMode::AccessMode(mode) => Some(mode),
1057 TransactionMode::IsolationLevel(_) => None,
1058 })
1059 .next_back()
1060 .copied()
1061 .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1062 let isolation_level = match isolation_level {
1063 ast::TransactionIsolationLevel::ReadUncommitted => {
1064 TransactionIsolationLevel::ReadUncommitted
1065 }
1066 ast::TransactionIsolationLevel::ReadCommitted => {
1067 TransactionIsolationLevel::ReadCommitted
1068 }
1069 ast::TransactionIsolationLevel::RepeatableRead => {
1070 TransactionIsolationLevel::RepeatableRead
1071 }
1072 ast::TransactionIsolationLevel::Serializable => {
1073 TransactionIsolationLevel::Serializable
1074 }
1075 ast::TransactionIsolationLevel::Snapshot => {
1076 TransactionIsolationLevel::Snapshot
1077 }
1078 };
1079 let access_mode = match access_mode {
1080 ast::TransactionAccessMode::ReadOnly => {
1081 TransactionAccessMode::ReadOnly
1082 }
1083 ast::TransactionAccessMode::ReadWrite => {
1084 TransactionAccessMode::ReadWrite
1085 }
1086 };
1087 let statement = PlanStatement::TransactionStart(TransactionStart {
1088 access_mode,
1089 isolation_level,
1090 });
1091 Ok(LogicalPlan::Statement(statement))
1092 }
1093 Statement::Commit {
1094 chain,
1095 end,
1096 modifier,
1097 } => {
1098 if end {
1099 return not_impl_err!("COMMIT AND END not supported");
1100 };
1101 if let Some(modifier) = modifier {
1102 return not_impl_err!("COMMIT {modifier} not supported");
1103 };
1104 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1105 conclusion: TransactionConclusion::Commit,
1106 chain,
1107 });
1108 Ok(LogicalPlan::Statement(statement))
1109 }
1110 Statement::Rollback { chain, savepoint } => {
1111 if savepoint.is_some() {
1112 plan_err!("Savepoints not supported")?;
1113 }
1114 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1115 conclusion: TransactionConclusion::Rollback,
1116 chain,
1117 });
1118 Ok(LogicalPlan::Statement(statement))
1119 }
1120 Statement::CreateFunction(ast::CreateFunction {
1121 or_replace,
1122 temporary,
1123 name,
1124 args,
1125 return_type,
1126 function_body,
1127 behavior,
1128 language,
1129 ..
1130 }) => {
1131 let return_type = match return_type {
1132 Some(t) => Some(self.convert_data_type(&t)?),
1133 None => None,
1134 };
1135 let mut planner_context = PlannerContext::new();
1136 let empty_schema = &DFSchema::empty();
1137
1138 let args = match args {
1139 Some(function_args) => {
1140 let function_args = function_args
1141 .into_iter()
1142 .map(|arg| {
1143 let data_type = self.convert_data_type(&arg.data_type)?;
1144
1145 let default_expr = match arg.default_expr {
1146 Some(expr) => Some(self.sql_to_expr(
1147 expr,
1148 empty_schema,
1149 &mut planner_context,
1150 )?),
1151 None => None,
1152 };
1153 Ok(OperateFunctionArg {
1154 name: arg.name,
1155 default_expr,
1156 data_type,
1157 })
1158 })
1159 .collect::<Result<Vec<OperateFunctionArg>>>();
1160 Some(function_args?)
1161 }
1162 None => None,
1163 };
1164 let name = match &name.0[..] {
1166 [] => exec_err!("Function should have name")?,
1167 [n] => n.as_ident().unwrap().value.clone(),
1168 [..] => not_impl_err!("Qualified functions are not supported")?,
1169 };
1170 let arg_types = args.as_ref().map(|arg| {
1174 arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
1175 });
1176 let mut planner_context = PlannerContext::new()
1177 .with_prepare_param_data_types(arg_types.unwrap_or_default());
1178
1179 let function_body = match function_body {
1180 Some(r) => Some(self.sql_to_expr(
1181 match r {
1182 ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1183 ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1184 ast::CreateFunctionBody::Return(expr) => expr,
1185 },
1186 &DFSchema::empty(),
1187 &mut planner_context,
1188 )?),
1189 None => None,
1190 };
1191
1192 let params = CreateFunctionBody {
1193 language,
1194 behavior: behavior.map(|b| match b {
1195 ast::FunctionBehavior::Immutable => Volatility::Immutable,
1196 ast::FunctionBehavior::Stable => Volatility::Stable,
1197 ast::FunctionBehavior::Volatile => Volatility::Volatile,
1198 }),
1199 function_body,
1200 };
1201
1202 let statement = DdlStatement::CreateFunction(CreateFunction {
1203 or_replace,
1204 temporary,
1205 name,
1206 return_type,
1207 args,
1208 params,
1209 schema: DFSchemaRef::new(DFSchema::empty()),
1210 });
1211
1212 Ok(LogicalPlan::Ddl(statement))
1213 }
1214 Statement::DropFunction {
1215 if_exists,
1216 func_desc,
1217 ..
1218 } => {
1219 if let Some(desc) = func_desc.first() {
1222 let name = match &desc.name.0[..] {
1224 [] => exec_err!("Function should have name")?,
1225 [n] => n.as_ident().unwrap().value.clone(),
1226 [..] => not_impl_err!("Qualified functions are not supported")?,
1227 };
1228 let statement = DdlStatement::DropFunction(DropFunction {
1229 if_exists,
1230 name,
1231 schema: DFSchemaRef::new(DFSchema::empty()),
1232 });
1233 Ok(LogicalPlan::Ddl(statement))
1234 } else {
1235 exec_err!("Function name not provided")
1236 }
1237 }
1238 Statement::CreateIndex(CreateIndex {
1239 name,
1240 table_name,
1241 using,
1242 columns,
1243 unique,
1244 if_not_exists,
1245 ..
1246 }) => {
1247 let name: Option<String> = name.as_ref().map(object_name_to_string);
1248 let table = self.object_name_to_table_reference(table_name)?;
1249 let table_schema = self
1250 .context_provider
1251 .get_table_source(table.clone())?
1252 .schema()
1253 .to_dfschema_ref()?;
1254 let using: Option<String> = using.as_ref().map(ident_to_string);
1255 let columns = self.order_by_to_sort_expr(
1256 columns,
1257 &table_schema,
1258 planner_context,
1259 false,
1260 None,
1261 )?;
1262 Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1263 PlanCreateIndex {
1264 name,
1265 table,
1266 using,
1267 columns,
1268 unique,
1269 if_not_exists,
1270 schema: DFSchemaRef::new(DFSchema::empty()),
1271 },
1272 )))
1273 }
1274 stmt => {
1275 not_impl_err!("Unsupported SQL statement: {stmt}")
1276 }
1277 }
1278 }
1279
1280 fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1281 let mut from = match from {
1282 FromTable::WithFromKeyword(v) => v,
1283 FromTable::WithoutKeyword(v) => v,
1284 };
1285
1286 if from.len() != 1 {
1287 return not_impl_err!(
1288 "DELETE FROM only supports single table, got {}: {from:?}",
1289 from.len()
1290 );
1291 }
1292 let table_factor = from.pop().unwrap();
1293 if !table_factor.joins.is_empty() {
1294 return not_impl_err!("DELETE FROM only supports single table, got: joins");
1295 }
1296 let TableFactor::Table { name, .. } = table_factor.relation else {
1297 return not_impl_err!(
1298 "DELETE FROM only supports single table, got: {table_factor:?}"
1299 );
1300 };
1301
1302 Ok(name)
1303 }
1304
1305 fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1307 if self.has_table("information_schema", "tables") {
1308 let query = "SELECT * FROM information_schema.tables;";
1309 let mut rewrite = DFParser::parse_sql(query)?;
1310 assert_eq!(rewrite.len(), 1);
1311 self.statement_to_plan(rewrite.pop_front().unwrap()) } else {
1313 plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1314 }
1315 }
1316
1317 fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1318 let table_ref = self.object_name_to_table_reference(table_name)?;
1319
1320 let table_source = self.context_provider.get_table_source(table_ref)?;
1321
1322 let schema = table_source.schema();
1323
1324 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1325
1326 Ok(LogicalPlan::DescribeTable(DescribeTable {
1327 schema,
1328 output_schema: Arc::new(output_schema),
1329 }))
1330 }
1331
1332 fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1333 let copy_source = statement.source;
1335 let (input, input_schema, table_ref) = match copy_source {
1336 CopyToSource::Relation(object_name) => {
1337 let table_name = object_name_to_string(&object_name);
1338 let table_ref = self.object_name_to_table_reference(object_name)?;
1339 let table_source =
1340 self.context_provider.get_table_source(table_ref.clone())?;
1341 let plan =
1342 LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1343 let input_schema = Arc::clone(plan.schema());
1344 (plan, input_schema, Some(table_ref))
1345 }
1346 CopyToSource::Query(query) => {
1347 let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1348 let input_schema = Arc::clone(plan.schema());
1349 (plan, input_schema, None)
1350 }
1351 };
1352
1353 let options_map = self.parse_options_map(statement.options, true)?;
1354
1355 let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1356 self.context_provider.get_file_type(stored_as).ok()
1357 } else {
1358 None
1359 };
1360
1361 let file_type = match maybe_file_type {
1362 Some(ft) => ft,
1363 None => {
1364 let e = || {
1365 DataFusionError::Configuration(
1366 "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1367 .to_string(),
1368 )
1369 };
1370 let extension: &str = &Path::new(&statement.target)
1372 .extension()
1373 .ok_or_else(e)?
1374 .to_str()
1375 .ok_or_else(e)?
1376 .to_lowercase();
1377
1378 self.context_provider.get_file_type(extension)?
1379 }
1380 };
1381
1382 let partition_by = statement
1383 .partitioned_by
1384 .iter()
1385 .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1386 .collect::<Result<Vec<_>>>()?
1387 .into_iter()
1388 .map(|f| f.name().to_owned())
1389 .collect();
1390
1391 Ok(LogicalPlan::Copy(CopyTo {
1392 input: Arc::new(input),
1393 output_url: statement.target,
1394 file_type,
1395 partition_by,
1396 options: options_map,
1397 }))
1398 }
1399
1400 fn build_order_by(
1401 &self,
1402 order_exprs: Vec<LexOrdering>,
1403 schema: &DFSchemaRef,
1404 planner_context: &mut PlannerContext,
1405 ) -> Result<Vec<Vec<SortExpr>>> {
1406 if !order_exprs.is_empty() && schema.fields().is_empty() {
1407 let results = order_exprs
1408 .iter()
1409 .map(|lex_order| {
1410 let result = lex_order
1411 .iter()
1412 .map(|order_by_expr| {
1413 let ordered_expr = &order_by_expr.expr;
1414 let ordered_expr = ordered_expr.to_owned();
1415 let ordered_expr = self
1416 .sql_expr_to_logical_expr(
1417 ordered_expr,
1418 schema,
1419 planner_context,
1420 )
1421 .unwrap();
1422 let asc = order_by_expr.options.asc.unwrap_or(true);
1423 let nulls_first =
1424 order_by_expr.options.nulls_first.unwrap_or(!asc);
1425
1426 SortExpr::new(ordered_expr, asc, nulls_first)
1427 })
1428 .collect::<Vec<SortExpr>>();
1429 result
1430 })
1431 .collect::<Vec<Vec<SortExpr>>>();
1432
1433 return Ok(results);
1434 }
1435
1436 let mut all_results = vec![];
1437 for expr in order_exprs {
1438 let expr_vec =
1440 self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1441 for sort in expr_vec.iter() {
1443 for column in sort.expr.column_refs().iter() {
1444 if !schema.has_column(column) {
1445 return plan_err!("Column {column} is not in schema");
1447 }
1448 }
1449 }
1450 all_results.push(expr_vec)
1452 }
1453 Ok(all_results)
1454 }
1455
1456 fn external_table_to_plan(
1458 &self,
1459 statement: CreateExternalTable,
1460 ) -> Result<LogicalPlan> {
1461 let definition = Some(statement.to_string());
1462 let CreateExternalTable {
1463 name,
1464 columns,
1465 file_type,
1466 location,
1467 table_partition_cols,
1468 if_not_exists,
1469 temporary,
1470 order_exprs,
1471 unbounded,
1472 options,
1473 constraints,
1474 } = statement;
1475
1476 let mut all_constraints = constraints;
1478 let inline_constraints = calc_inline_constraints_from_columns(&columns);
1479 all_constraints.extend(inline_constraints);
1480
1481 let options_map = self.parse_options_map(options, false)?;
1482
1483 let compression = options_map
1484 .get("format.compression")
1485 .map(|c| CompressionTypeVariant::from_str(c))
1486 .transpose()?;
1487 if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1488 && compression
1489 .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1490 .unwrap_or(false)
1491 {
1492 plan_err!(
1493 "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1494 )?;
1495 }
1496
1497 let mut planner_context = PlannerContext::new();
1498
1499 let column_defaults = self
1500 .build_column_defaults(&columns, &mut planner_context)?
1501 .into_iter()
1502 .collect();
1503
1504 let schema = self.build_schema(columns)?;
1505 let df_schema = schema.to_dfschema_ref()?;
1506 df_schema.check_names()?;
1507
1508 let ordered_exprs =
1509 self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1510
1511 let name = self.object_name_to_table_reference(name)?;
1512 let constraints =
1513 self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1514 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1515 PlanCreateExternalTable {
1516 schema: df_schema,
1517 name,
1518 location,
1519 file_type,
1520 table_partition_cols,
1521 if_not_exists,
1522 temporary,
1523 definition,
1524 order_exprs: ordered_exprs,
1525 unbounded,
1526 options: options_map,
1527 constraints,
1528 column_defaults,
1529 },
1530 )))
1531 }
1532
1533 fn get_constraint_column_indices(
1536 &self,
1537 df_schema: &DFSchemaRef,
1538 columns: &[Ident],
1539 constraint_name: &str,
1540 ) -> Result<Vec<usize>> {
1541 let field_names = df_schema.field_names();
1542 columns
1543 .iter()
1544 .map(|ident| {
1545 let column = self.ident_normalizer.normalize(ident.clone());
1546 field_names
1547 .iter()
1548 .position(|item| *item == column)
1549 .ok_or_else(|| {
1550 plan_datafusion_err!(
1551 "Column for {constraint_name} not found in schema: {column}"
1552 )
1553 })
1554 })
1555 .collect::<Result<Vec<_>>>()
1556 }
1557
1558 pub fn new_constraint_from_table_constraints(
1560 &self,
1561 constraints: &[TableConstraint],
1562 df_schema: &DFSchemaRef,
1563 ) -> Result<Constraints> {
1564 let constraints = constraints
1565 .iter()
1566 .map(|c: &TableConstraint| match c {
1567 TableConstraint::Unique { name, columns, .. } => {
1568 let constraint_name = match name {
1569 Some(name) => &format!("unique constraint with name '{name}'"),
1570 None => "unique constraint",
1571 };
1572 let indices = self.get_constraint_column_indices(
1574 df_schema,
1575 columns,
1576 constraint_name,
1577 )?;
1578 Ok(Constraint::Unique(indices))
1579 }
1580 TableConstraint::PrimaryKey { columns, .. } => {
1581 let indices = self.get_constraint_column_indices(
1583 df_schema,
1584 columns,
1585 "primary key",
1586 )?;
1587 Ok(Constraint::PrimaryKey(indices))
1588 }
1589 TableConstraint::ForeignKey { .. } => {
1590 _plan_err!("Foreign key constraints are not currently supported")
1591 }
1592 TableConstraint::Check { .. } => {
1593 _plan_err!("Check constraints are not currently supported")
1594 }
1595 TableConstraint::Index { .. } => {
1596 _plan_err!("Indexes are not currently supported")
1597 }
1598 TableConstraint::FulltextOrSpatial { .. } => {
1599 _plan_err!("Indexes are not currently supported")
1600 }
1601 })
1602 .collect::<Result<Vec<_>>>()?;
1603 Ok(Constraints::new_unverified(constraints))
1604 }
1605
1606 fn parse_options_map(
1607 &self,
1608 options: Vec<(String, Value)>,
1609 allow_duplicates: bool,
1610 ) -> Result<HashMap<String, String>> {
1611 let mut options_map = HashMap::new();
1612 for (key, value) in options {
1613 if !allow_duplicates && options_map.contains_key(&key) {
1614 return plan_err!("Option {key} is specified multiple times");
1615 }
1616
1617 let Some(value_string) = crate::utils::value_to_string(&value) else {
1618 return plan_err!("Unsupported Value {}", value);
1619 };
1620
1621 if !(&key.contains('.')) {
1622 let renamed_key = format!("format.{key}");
1626 options_map.insert(renamed_key.to_lowercase(), value_string);
1627 } else {
1628 options_map.insert(key.to_lowercase(), value_string);
1629 }
1630 }
1631
1632 Ok(options_map)
1633 }
1634
1635 fn explain_to_plan(
1640 &self,
1641 verbose: bool,
1642 analyze: bool,
1643 format: Option<String>,
1644 statement: DFStatement,
1645 ) -> Result<LogicalPlan> {
1646 let plan = self.statement_to_plan(statement)?;
1647 if matches!(plan, LogicalPlan::Explain(_)) {
1648 return plan_err!("Nested EXPLAINs are not supported");
1649 }
1650
1651 let plan = Arc::new(plan);
1652 let schema = LogicalPlan::explain_schema();
1653 let schema = schema.to_dfschema_ref()?;
1654
1655 if verbose && format.is_some() {
1656 return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1657 }
1658
1659 if analyze {
1660 if format.is_some() {
1661 return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1662 }
1663 Ok(LogicalPlan::Analyze(Analyze {
1664 verbose,
1665 input: plan,
1666 schema,
1667 }))
1668 } else {
1669 let stringified_plans =
1670 vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1671
1672 let options = self.context_provider.options();
1674 let format = format.as_ref().unwrap_or(&options.explain.format);
1675
1676 let format: ExplainFormat = format.parse()?;
1677
1678 Ok(LogicalPlan::Explain(Explain {
1679 verbose,
1680 explain_format: format,
1681 plan,
1682 stringified_plans,
1683 schema,
1684 logical_optimization_succeeded: false,
1685 }))
1686 }
1687 }
1688
1689 fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1690 if !self.has_table("information_schema", "df_settings") {
1691 return plan_err!(
1692 "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1693 );
1694 }
1695
1696 let verbose = variable
1697 .last()
1698 .map(|s| ident_to_string(s) == "verbose")
1699 .unwrap_or(false);
1700 let mut variable_vec = variable.to_vec();
1701 let mut columns: String = "name, value".to_owned();
1702
1703 if verbose {
1704 columns = format!("{columns}, description");
1705 variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1706 }
1707
1708 let variable = object_name_to_string(&ObjectName::from(variable_vec));
1709 let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1710 let query = if variable == "all" {
1711 format!("{base_query} ORDER BY name")
1713 } else if variable == "timezone" || variable == "time.zone" {
1714 format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1716 } else {
1717 let is_valid_variable = self
1721 .context_provider
1722 .options()
1723 .entries()
1724 .iter()
1725 .any(|opt| opt.key == variable);
1726
1727 if !is_valid_variable {
1728 return plan_err!(
1729 "'{variable}' is not a variable which can be viewed with 'SHOW'"
1730 );
1731 }
1732
1733 format!("{base_query} WHERE name = '{variable}'")
1734 };
1735
1736 let mut rewrite = DFParser::parse_sql(&query)?;
1737 assert_eq!(rewrite.len(), 1);
1738
1739 self.statement_to_plan(rewrite.pop_front().unwrap())
1740 }
1741
1742 fn set_variable_to_plan(
1743 &self,
1744 local: bool,
1745 hivevar: bool,
1746 variables: &OneOrManyWithParens<ObjectName>,
1747 value: Vec<SQLExpr>,
1748 ) -> Result<LogicalPlan> {
1749 if local {
1750 return not_impl_err!("LOCAL is not supported");
1751 }
1752
1753 if hivevar {
1754 return not_impl_err!("HIVEVAR is not supported");
1755 }
1756
1757 let variable = match variables {
1758 OneOrManyWithParens::One(v) => object_name_to_string(v),
1759 OneOrManyWithParens::Many(vs) => {
1760 return not_impl_err!(
1761 "SET only supports single variable assignment: {vs:?}"
1762 );
1763 }
1764 };
1765 let mut variable_lower = variable.to_lowercase();
1766
1767 if variable_lower == "timezone" || variable_lower == "time.zone" {
1768 variable_lower = "datafusion.execution.time_zone".to_string();
1770 }
1771
1772 let value_string = match &value[0] {
1774 SQLExpr::Identifier(i) => ident_to_string(i),
1775 SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
1776 None => {
1777 return plan_err!("Unsupported Value {}", value[0]);
1778 }
1779 Some(v) => v,
1780 },
1781 SQLExpr::UnaryOp { op, expr } => match op {
1783 UnaryOperator::Plus => format!("+{expr}"),
1784 UnaryOperator::Minus => format!("-{expr}"),
1785 _ => {
1786 return plan_err!("Unsupported Value {}", value[0]);
1787 }
1788 },
1789 _ => {
1790 return plan_err!("Unsupported Value {}", value[0]);
1791 }
1792 };
1793
1794 let statement = PlanStatement::SetVariable(SetVariable {
1795 variable: variable_lower,
1796 value: value_string,
1797 });
1798
1799 Ok(LogicalPlan::Statement(statement))
1800 }
1801
1802 fn delete_to_plan(
1803 &self,
1804 table_name: ObjectName,
1805 predicate_expr: Option<SQLExpr>,
1806 ) -> Result<LogicalPlan> {
1807 let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1809 let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1810 let schema = DFSchema::try_from_qualified_schema(
1811 table_ref.clone(),
1812 &table_source.schema(),
1813 )?;
1814 let scan =
1815 LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1816 .build()?;
1817 let mut planner_context = PlannerContext::new();
1818
1819 let source = match predicate_expr {
1820 None => scan,
1821 Some(predicate_expr) => {
1822 let filter_expr =
1823 self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1824 let schema = Arc::new(schema);
1825 let mut using_columns = HashSet::new();
1826 expr_to_columns(&filter_expr, &mut using_columns)?;
1827 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1828 filter_expr,
1829 &[&[&schema]],
1830 &[using_columns],
1831 )?;
1832 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1833 }
1834 };
1835
1836 let plan = LogicalPlan::Dml(DmlStatement::new(
1837 table_ref,
1838 table_source,
1839 WriteOp::Delete,
1840 Arc::new(source),
1841 ));
1842 Ok(plan)
1843 }
1844
1845 fn update_to_plan(
1846 &self,
1847 table: TableWithJoins,
1848 assignments: Vec<Assignment>,
1849 from: Option<TableWithJoins>,
1850 predicate_expr: Option<SQLExpr>,
1851 ) -> Result<LogicalPlan> {
1852 let (table_name, table_alias) = match &table.relation {
1853 TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
1854 _ => plan_err!("Cannot update non-table relation!")?,
1855 };
1856
1857 let table_name = self.object_name_to_table_reference(table_name)?;
1859 let table_source = self.context_provider.get_table_source(table_name.clone())?;
1860 let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
1861 table_name.clone(),
1862 &table_source.schema(),
1863 )?);
1864
1865 let mut planner_context = PlannerContext::new();
1867 let mut assign_map = assignments
1868 .iter()
1869 .map(|assign| {
1870 let cols = match &assign.target {
1871 AssignmentTarget::ColumnName(cols) => cols,
1872 _ => plan_err!("Tuples are not supported")?,
1873 };
1874 let col_name: &Ident = cols
1875 .0
1876 .iter()
1877 .last()
1878 .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
1879 .as_ident()
1880 .unwrap();
1881 table_schema.field_with_unqualified_name(&col_name.value)?;
1883 Ok((col_name.value.clone(), assign.value.clone()))
1884 })
1885 .collect::<Result<HashMap<String, SQLExpr>>>()?;
1886
1887 let mut input_tables = vec![table];
1889 input_tables.extend(from);
1890 let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
1891
1892 let source = match predicate_expr {
1894 None => scan,
1895 Some(predicate_expr) => {
1896 let filter_expr = self.sql_to_expr(
1897 predicate_expr,
1898 scan.schema(),
1899 &mut planner_context,
1900 )?;
1901 let mut using_columns = HashSet::new();
1902 expr_to_columns(&filter_expr, &mut using_columns)?;
1903 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1904 filter_expr,
1905 &[&[scan.schema()]],
1906 &[using_columns],
1907 )?;
1908 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1909 }
1910 };
1911
1912 let exprs = table_schema
1914 .iter()
1915 .map(|(qualifier, field)| {
1916 let expr = match assign_map.remove(field.name()) {
1917 Some(new_value) => {
1918 let mut expr = self.sql_to_expr(
1919 new_value,
1920 source.schema(),
1921 &mut planner_context,
1922 )?;
1923 if let Expr::Placeholder(placeholder) = &mut expr {
1925 placeholder.data_type = placeholder
1926 .data_type
1927 .take()
1928 .or_else(|| Some(field.data_type().clone()));
1929 }
1930 expr.cast_to(field.data_type(), source.schema())?
1932 }
1933 None => {
1934 if let Some(alias) = &table_alias {
1936 Expr::Column(Column::new(
1937 Some(self.ident_normalizer.normalize(alias.name.clone())),
1938 field.name(),
1939 ))
1940 } else {
1941 Expr::Column(Column::from((qualifier, field)))
1942 }
1943 }
1944 };
1945 Ok(expr.alias(field.name()))
1946 })
1947 .collect::<Result<Vec<_>>>()?;
1948
1949 let source = project(source, exprs)?;
1950
1951 let plan = LogicalPlan::Dml(DmlStatement::new(
1952 table_name,
1953 table_source,
1954 WriteOp::Update,
1955 Arc::new(source),
1956 ));
1957 Ok(plan)
1958 }
1959
1960 fn insert_to_plan(
1961 &self,
1962 table_name: ObjectName,
1963 columns: Vec<Ident>,
1964 source: Box<Query>,
1965 overwrite: bool,
1966 replace_into: bool,
1967 ) -> Result<LogicalPlan> {
1968 let table_name = self.object_name_to_table_reference(table_name)?;
1970 let table_source = self.context_provider.get_table_source(table_name.clone())?;
1971 let arrow_schema = (*table_source.schema()).clone();
1972 let table_schema = DFSchema::try_from(arrow_schema)?;
1973
1974 let (fields, value_indices) = if columns.is_empty() {
1982 (
1984 table_schema.fields().clone(),
1985 (0..table_schema.fields().len())
1986 .map(Some)
1987 .collect::<Vec<_>>(),
1988 )
1989 } else {
1990 let mut value_indices = vec![None; table_schema.fields().len()];
1991 let fields = columns
1992 .into_iter()
1993 .map(|c| self.ident_normalizer.normalize(c))
1994 .enumerate()
1995 .map(|(i, c)| {
1996 let column_index = table_schema
1997 .index_of_column_by_name(None, &c)
1998 .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
1999
2000 if value_indices[column_index].is_some() {
2001 return schema_err!(SchemaError::DuplicateUnqualifiedField {
2002 name: c,
2003 });
2004 } else {
2005 value_indices[column_index] = Some(i);
2006 }
2007 Ok(table_schema.field(column_index).clone())
2008 })
2009 .collect::<Result<Vec<_>>>()?;
2010 (Fields::from(fields), value_indices)
2011 };
2012
2013 let mut prepare_param_data_types = BTreeMap::new();
2015 if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2016 for row in rows.iter() {
2017 for (idx, val) in row.iter().enumerate() {
2018 if let SQLExpr::Value(ValueWithSpan {
2019 value: Value::Placeholder(name),
2020 span: _,
2021 }) = val
2022 {
2023 let name =
2024 name.replace('$', "").parse::<usize>().map_err(|_| {
2025 plan_datafusion_err!("Can't parse placeholder: {name}")
2026 })? - 1;
2027 let field = fields.get(idx).ok_or_else(|| {
2028 plan_datafusion_err!(
2029 "Placeholder ${} refers to a non existent column",
2030 idx + 1
2031 )
2032 })?;
2033 let dt = field.data_type().clone();
2034 let _ = prepare_param_data_types.insert(name, dt);
2035 }
2036 }
2037 }
2038 }
2039 let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2040
2041 let mut planner_context =
2043 PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2044 planner_context.set_table_schema(Some(DFSchemaRef::new(
2045 DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2046 )));
2047 let source = self.query_to_plan(*source, &mut planner_context)?;
2048 if fields.len() != source.schema().fields().len() {
2049 plan_err!("Column count doesn't match insert query!")?;
2050 }
2051
2052 let exprs = value_indices
2053 .into_iter()
2054 .enumerate()
2055 .map(|(i, value_index)| {
2056 let target_field = table_schema.field(i);
2057 let expr = match value_index {
2058 Some(v) => {
2059 Expr::Column(Column::from(source.schema().qualified_field(v)))
2060 .cast_to(target_field.data_type(), source.schema())?
2061 }
2062 None => table_source
2064 .get_column_default(target_field.name())
2065 .cloned()
2066 .unwrap_or_else(|| {
2067 Expr::Literal(ScalarValue::Null, None)
2069 })
2070 .cast_to(target_field.data_type(), &DFSchema::empty())?,
2071 };
2072 Ok(expr.alias(target_field.name()))
2073 })
2074 .collect::<Result<Vec<Expr>>>()?;
2075 let source = project(source, exprs)?;
2076
2077 let insert_op = match (overwrite, replace_into) {
2078 (false, false) => InsertOp::Append,
2079 (true, false) => InsertOp::Overwrite,
2080 (false, true) => InsertOp::Replace,
2081 (true, true) => plan_err!("Conflicting insert operations: `overwrite` and `replace_into` cannot both be true")?,
2082 };
2083
2084 let plan = LogicalPlan::Dml(DmlStatement::new(
2085 table_name,
2086 Arc::clone(&table_source),
2087 WriteOp::Insert(insert_op),
2088 Arc::new(source),
2089 ));
2090 Ok(plan)
2091 }
2092
2093 fn show_columns_to_plan(
2094 &self,
2095 extended: bool,
2096 full: bool,
2097 sql_table_name: ObjectName,
2098 ) -> Result<LogicalPlan> {
2099 let where_clause = object_name_to_qualifier(
2101 &sql_table_name,
2102 self.options.enable_ident_normalization,
2103 )?;
2104
2105 if !self.has_table("information_schema", "columns") {
2106 return plan_err!(
2107 "SHOW COLUMNS is not supported unless information_schema is enabled"
2108 );
2109 }
2110
2111 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2113 let _ = self.context_provider.get_table_source(table_ref)?;
2114
2115 let select_list = if full || extended {
2117 "*"
2118 } else {
2119 "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2120 };
2121
2122 let query = format!(
2123 "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2124 );
2125
2126 let mut rewrite = DFParser::parse_sql(&query)?;
2127 assert_eq!(rewrite.len(), 1);
2128 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2130
2131 fn show_functions_to_plan(
2142 &self,
2143 filter: Option<ShowStatementFilter>,
2144 ) -> Result<LogicalPlan> {
2145 let where_clause = if let Some(filter) = filter {
2146 match filter {
2147 ShowStatementFilter::Like(like) => {
2148 format!("WHERE p.function_name like '{like}'")
2149 }
2150 _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2151 }
2152 } else {
2153 "".to_string()
2154 };
2155
2156 let query = format!(
2157 r#"
2158SELECT DISTINCT
2159 p.*,
2160 r.function_type function_type,
2161 r.description description,
2162 r.syntax_example syntax_example
2163FROM
2164 (
2165 SELECT
2166 i.specific_name function_name,
2167 o.data_type return_type,
2168 array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2169 array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2170 FROM (
2171 SELECT
2172 specific_catalog,
2173 specific_schema,
2174 specific_name,
2175 ordinal_position,
2176 parameter_name,
2177 data_type,
2178 rid
2179 FROM
2180 information_schema.parameters
2181 WHERE
2182 parameter_mode = 'IN'
2183 ) i
2184 JOIN
2185 (
2186 SELECT
2187 specific_catalog,
2188 specific_schema,
2189 specific_name,
2190 ordinal_position,
2191 parameter_name,
2192 data_type,
2193 rid
2194 FROM
2195 information_schema.parameters
2196 WHERE
2197 parameter_mode = 'OUT'
2198 ) o
2199 ON i.specific_catalog = o.specific_catalog
2200 AND i.specific_schema = o.specific_schema
2201 AND i.specific_name = o.specific_name
2202 AND i.rid = o.rid
2203 GROUP BY 1, 2, i.rid
2204 ) as p
2205JOIN information_schema.routines r
2206ON p.function_name = r.routine_name
2207{where_clause}
2208 "#
2209 );
2210 let mut rewrite = DFParser::parse_sql(&query)?;
2211 assert_eq!(rewrite.len(), 1);
2212 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2214
2215 fn show_create_table_to_plan(
2216 &self,
2217 sql_table_name: ObjectName,
2218 ) -> Result<LogicalPlan> {
2219 if !self.has_table("information_schema", "tables") {
2220 return plan_err!(
2221 "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2222 );
2223 }
2224 let where_clause = object_name_to_qualifier(
2226 &sql_table_name,
2227 self.options.enable_ident_normalization,
2228 )?;
2229
2230 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2232 let _ = self.context_provider.get_table_source(table_ref)?;
2233
2234 let query = format!(
2235 "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2236 );
2237
2238 let mut rewrite = DFParser::parse_sql(&query)?;
2239 assert_eq!(rewrite.len(), 1);
2240 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2242
2243 fn has_table(&self, schema: &str, table: &str) -> bool {
2245 let tables_reference = TableReference::Partial {
2246 schema: schema.into(),
2247 table: table.into(),
2248 };
2249 self.context_provider
2250 .get_table_source(tables_reference)
2251 .is_ok()
2252 }
2253
2254 fn validate_transaction_kind(
2255 &self,
2256 kind: Option<BeginTransactionKind>,
2257 ) -> Result<()> {
2258 match kind {
2259 None => Ok(()),
2261 Some(BeginTransactionKind::Transaction) => Ok(()),
2263 Some(BeginTransactionKind::Work) => {
2264 not_impl_err!("Transaction kind not supported: {kind:?}")
2265 }
2266 }
2267 }
2268}