1use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24 CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25 LexOrdering, Statement as DFStatement,
26};
27use crate::planner::{
28 object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{DataType, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36 exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
37 unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
38 DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
39 ToDFSchema,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::builder::project;
44use datafusion_expr::logical_plan::DdlStatement;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47 cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
48 CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49 CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50 DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51 EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52 LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable,
53 SortExpr, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
54 TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
55 Volatility, WriteOp,
56};
57use sqlparser::ast::{
58 self, BeginTransactionKind, NullsDistinctOption, ShowStatementIn,
59 ShowStatementOptions, SqliteOnConflict, TableObject, UpdateTableFromKind,
60 ValueWithSpan,
61};
62use sqlparser::ast::{
63 Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
64 CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
65 ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr,
66 ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor,
67 TableWithJoins, TransactionMode, UnaryOperator, Value,
68};
69use sqlparser::parser::ParserError::ParserError;
70
71fn ident_to_string(ident: &Ident) -> String {
72 normalize_ident(ident.to_owned())
73}
74
75fn object_name_to_string(object_name: &ObjectName) -> String {
76 object_name
77 .0
78 .iter()
79 .map(|object_name_part| {
80 object_name_part
81 .as_ident()
82 .map_or_else(String::new, ident_to_string)
85 })
86 .collect::<Vec<String>>()
87 .join(".")
88}
89
90fn get_schema_name(schema_name: &SchemaName) -> String {
91 match schema_name {
92 SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
93 SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
94 SchemaName::NamedAuthorization(schema_name, auth) => format!(
95 "{}.{}",
96 object_name_to_string(schema_name),
97 ident_to_string(auth)
98 ),
99 }
100}
101
102fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
105 let mut constraints = vec![];
106 for column in columns {
107 for ast::ColumnOptionDef { name, option } in &column.options {
108 match option {
109 ast::ColumnOption::Unique {
110 is_primary: false,
111 characteristics,
112 } => constraints.push(TableConstraint::Unique {
113 name: name.clone(),
114 columns: vec![column.name.clone()],
115 characteristics: *characteristics,
116 index_name: None,
117 index_type_display: ast::KeyOrIndexDisplay::None,
118 index_type: None,
119 index_options: vec![],
120 nulls_distinct: NullsDistinctOption::None,
121 }),
122 ast::ColumnOption::Unique {
123 is_primary: true,
124 characteristics,
125 } => constraints.push(TableConstraint::PrimaryKey {
126 name: name.clone(),
127 columns: vec![column.name.clone()],
128 characteristics: *characteristics,
129 index_name: None,
130 index_type: None,
131 index_options: vec![],
132 }),
133 ast::ColumnOption::ForeignKey {
134 foreign_table,
135 referred_columns,
136 on_delete,
137 on_update,
138 characteristics,
139 } => constraints.push(TableConstraint::ForeignKey {
140 name: name.clone(),
141 columns: vec![],
142 foreign_table: foreign_table.clone(),
143 referred_columns: referred_columns.to_vec(),
144 on_delete: *on_delete,
145 on_update: *on_update,
146 characteristics: *characteristics,
147 }),
148 ast::ColumnOption::Check(expr) => {
149 constraints.push(TableConstraint::Check {
150 name: name.clone(),
151 expr: Box::new(expr.clone()),
152 })
153 }
154 ast::ColumnOption::Default(_)
156 | ast::ColumnOption::Null
157 | ast::ColumnOption::NotNull
158 | ast::ColumnOption::DialectSpecific(_)
159 | ast::ColumnOption::CharacterSet(_)
160 | ast::ColumnOption::Generated { .. }
161 | ast::ColumnOption::Comment(_)
162 | ast::ColumnOption::Options(_)
163 | ast::ColumnOption::OnUpdate(_)
164 | ast::ColumnOption::Materialized(_)
165 | ast::ColumnOption::Ephemeral(_)
166 | ast::ColumnOption::Identity(_)
167 | ast::ColumnOption::OnConflict(_)
168 | ast::ColumnOption::Policy(_)
169 | ast::ColumnOption::Tags(_)
170 | ast::ColumnOption::Alias(_)
171 | ast::ColumnOption::Collation(_) => {}
172 }
173 }
174 }
175 constraints
176}
177
178impl<S: ContextProvider> SqlToRel<'_, S> {
179 pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
181 match statement {
182 DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
183 DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
184 DFStatement::CopyTo(s) => self.copy_to_plan(s),
185 DFStatement::Explain(ExplainStatement {
186 verbose,
187 analyze,
188 format,
189 statement,
190 }) => self.explain_to_plan(verbose, analyze, format, *statement),
191 }
192 }
193
194 pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
196 self.sql_statement_to_plan_with_context_impl(
197 statement,
198 &mut PlannerContext::new(),
199 )
200 }
201
202 pub fn sql_statement_to_plan_with_context(
204 &self,
205 statement: Statement,
206 planner_context: &mut PlannerContext,
207 ) -> Result<LogicalPlan> {
208 self.sql_statement_to_plan_with_context_impl(statement, planner_context)
209 }
210
211 fn sql_statement_to_plan_with_context_impl(
212 &self,
213 statement: Statement,
214 planner_context: &mut PlannerContext,
215 ) -> Result<LogicalPlan> {
216 match statement {
217 Statement::ExplainTable {
218 describe_alias: DescribeAlias::Describe, table_name,
220 ..
221 } => self.describe_table_to_plan(table_name),
222 Statement::Explain {
223 verbose,
224 statement,
225 analyze,
226 format,
227 describe_alias: _,
228 ..
229 } => {
230 let format = format.map(|format| format.to_string());
231 let statement = DFStatement::Statement(statement);
232 self.explain_to_plan(verbose, analyze, format, statement)
233 }
234 Statement::Query(query) => self.query_to_plan(*query, planner_context),
235 Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
236 Statement::SetVariable {
237 local,
238 hivevar,
239 variables,
240 value,
241 } => self.set_variable_to_plan(local, hivevar, &variables, value),
242
243 Statement::CreateTable(CreateTable {
244 temporary,
245 external,
246 global,
247 transient,
248 volatile,
249 hive_distribution,
250 hive_formats,
251 file_format,
252 location,
253 query,
254 name,
255 columns,
256 constraints,
257 table_properties,
258 with_options,
259 if_not_exists,
260 or_replace,
261 without_rowid,
262 like,
263 clone,
264 engine,
265 comment,
266 auto_increment_offset,
267 default_charset,
268 collation,
269 on_commit,
270 on_cluster,
271 primary_key,
272 order_by,
273 partition_by,
274 cluster_by,
275 clustered_by,
276 options,
277 strict,
278 copy_grants,
279 enable_schema_evolution,
280 change_tracking,
281 data_retention_time_in_days,
282 max_data_extension_time_in_days,
283 default_ddl_collation,
284 with_aggregation_policy,
285 with_row_access_policy,
286 with_tags,
287 iceberg,
288 external_volume,
289 base_location,
290 catalog,
291 catalog_sync,
292 storage_serialization_policy,
293 }) if table_properties.is_empty() && with_options.is_empty() => {
294 if temporary {
295 return not_impl_err!("Temporary tables not supported")?;
296 }
297 if external {
298 return not_impl_err!("External tables not supported")?;
299 }
300 if global.is_some() {
301 return not_impl_err!("Global tables not supported")?;
302 }
303 if transient {
304 return not_impl_err!("Transient tables not supported")?;
305 }
306 if volatile {
307 return not_impl_err!("Volatile tables not supported")?;
308 }
309 if hive_distribution != ast::HiveDistributionStyle::NONE {
310 return not_impl_err!(
311 "Hive distribution not supported: {hive_distribution:?}"
312 )?;
313 }
314 if !matches!(
315 hive_formats,
316 Some(ast::HiveFormat {
317 row_format: None,
318 serde_properties: None,
319 storage: None,
320 location: None,
321 })
322 ) {
323 return not_impl_err!(
324 "Hive formats not supported: {hive_formats:?}"
325 )?;
326 }
327 if file_format.is_some() {
328 return not_impl_err!("File format not supported")?;
329 }
330 if location.is_some() {
331 return not_impl_err!("Location not supported")?;
332 }
333 if without_rowid {
334 return not_impl_err!("Without rowid not supported")?;
335 }
336 if like.is_some() {
337 return not_impl_err!("Like not supported")?;
338 }
339 if clone.is_some() {
340 return not_impl_err!("Clone not supported")?;
341 }
342 if engine.is_some() {
343 return not_impl_err!("Engine not supported")?;
344 }
345 if comment.is_some() {
346 return not_impl_err!("Comment not supported")?;
347 }
348 if auto_increment_offset.is_some() {
349 return not_impl_err!("Auto increment offset not supported")?;
350 }
351 if default_charset.is_some() {
352 return not_impl_err!("Default charset not supported")?;
353 }
354 if collation.is_some() {
355 return not_impl_err!("Collation not supported")?;
356 }
357 if on_commit.is_some() {
358 return not_impl_err!("On commit not supported")?;
359 }
360 if on_cluster.is_some() {
361 return not_impl_err!("On cluster not supported")?;
362 }
363 if primary_key.is_some() {
364 return not_impl_err!("Primary key not supported")?;
365 }
366 if order_by.is_some() {
367 return not_impl_err!("Order by not supported")?;
368 }
369 if partition_by.is_some() {
370 return not_impl_err!("Partition by not supported")?;
371 }
372 if cluster_by.is_some() {
373 return not_impl_err!("Cluster by not supported")?;
374 }
375 if clustered_by.is_some() {
376 return not_impl_err!("Clustered by not supported")?;
377 }
378 if options.is_some() {
379 return not_impl_err!("Options not supported")?;
380 }
381 if strict {
382 return not_impl_err!("Strict not supported")?;
383 }
384 if copy_grants {
385 return not_impl_err!("Copy grants not supported")?;
386 }
387 if enable_schema_evolution.is_some() {
388 return not_impl_err!("Enable schema evolution not supported")?;
389 }
390 if change_tracking.is_some() {
391 return not_impl_err!("Change tracking not supported")?;
392 }
393 if data_retention_time_in_days.is_some() {
394 return not_impl_err!("Data retention time in days not supported")?;
395 }
396 if max_data_extension_time_in_days.is_some() {
397 return not_impl_err!(
398 "Max data extension time in days not supported"
399 )?;
400 }
401 if default_ddl_collation.is_some() {
402 return not_impl_err!("Default DDL collation not supported")?;
403 }
404 if with_aggregation_policy.is_some() {
405 return not_impl_err!("With aggregation policy not supported")?;
406 }
407 if with_row_access_policy.is_some() {
408 return not_impl_err!("With row access policy not supported")?;
409 }
410 if with_tags.is_some() {
411 return not_impl_err!("With tags not supported")?;
412 }
413 if iceberg {
414 return not_impl_err!("Iceberg not supported")?;
415 }
416 if external_volume.is_some() {
417 return not_impl_err!("External volume not supported")?;
418 }
419 if base_location.is_some() {
420 return not_impl_err!("Base location not supported")?;
421 }
422 if catalog.is_some() {
423 return not_impl_err!("Catalog not supported")?;
424 }
425 if catalog_sync.is_some() {
426 return not_impl_err!("Catalog sync not supported")?;
427 }
428 if storage_serialization_policy.is_some() {
429 return not_impl_err!("Storage serialization policy not supported")?;
430 }
431
432 let mut all_constraints = constraints;
434 let inline_constraints = calc_inline_constraints_from_columns(&columns);
435 all_constraints.extend(inline_constraints);
436 let column_defaults =
438 self.build_column_defaults(&columns, planner_context)?;
439
440 let has_columns = !columns.is_empty();
441 let schema = self.build_schema(columns)?.to_dfschema_ref()?;
442 if has_columns {
443 planner_context.set_table_schema(Some(Arc::clone(&schema)));
444 }
445
446 match query {
447 Some(query) => {
448 let plan = self.query_to_plan(*query, planner_context)?;
449 let input_schema = plan.schema();
450
451 let plan = if has_columns {
452 if schema.fields().len() != input_schema.fields().len() {
453 return plan_err!(
454 "Mismatch: {} columns specified, but result has {} columns",
455 schema.fields().len(),
456 input_schema.fields().len()
457 );
458 }
459 let input_fields = input_schema.fields();
460 let project_exprs = schema
461 .fields()
462 .iter()
463 .zip(input_fields)
464 .map(|(field, input_field)| {
465 cast(
466 col(input_field.name()),
467 field.data_type().clone(),
468 )
469 .alias(field.name())
470 })
471 .collect::<Vec<_>>();
472
473 LogicalPlanBuilder::from(plan.clone())
474 .project(project_exprs)?
475 .build()?
476 } else {
477 plan
478 };
479
480 let constraints = self.new_constraint_from_table_constraints(
481 &all_constraints,
482 plan.schema(),
483 )?;
484
485 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
486 CreateMemoryTable {
487 name: self.object_name_to_table_reference(name)?,
488 constraints,
489 input: Arc::new(plan),
490 if_not_exists,
491 or_replace,
492 column_defaults,
493 temporary,
494 },
495 )))
496 }
497
498 None => {
499 let plan = EmptyRelation {
500 produce_one_row: false,
501 schema,
502 };
503 let plan = LogicalPlan::EmptyRelation(plan);
504 let constraints = self.new_constraint_from_table_constraints(
505 &all_constraints,
506 plan.schema(),
507 )?;
508 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
509 CreateMemoryTable {
510 name: self.object_name_to_table_reference(name)?,
511 constraints,
512 input: Arc::new(plan),
513 if_not_exists,
514 or_replace,
515 column_defaults,
516 temporary,
517 },
518 )))
519 }
520 }
521 }
522
523 Statement::CreateView {
524 or_replace,
525 materialized,
526 name,
527 columns,
528 query,
529 options: CreateTableOptions::None,
530 cluster_by,
531 comment,
532 with_no_schema_binding,
533 if_not_exists,
534 temporary,
535 to,
536 params,
537 } => {
538 if materialized {
539 return not_impl_err!("Materialized views not supported")?;
540 }
541 if !cluster_by.is_empty() {
542 return not_impl_err!("Cluster by not supported")?;
543 }
544 if comment.is_some() {
545 return not_impl_err!("Comment not supported")?;
546 }
547 if with_no_schema_binding {
548 return not_impl_err!("With no schema binding not supported")?;
549 }
550 if if_not_exists {
551 return not_impl_err!("If not exists not supported")?;
552 }
553 if to.is_some() {
554 return not_impl_err!("To not supported")?;
555 }
556
557 let stmt = Statement::CreateView {
560 or_replace,
561 materialized,
562 name,
563 columns,
564 query,
565 options: CreateTableOptions::None,
566 cluster_by,
567 comment,
568 with_no_schema_binding,
569 if_not_exists,
570 temporary,
571 to,
572 params,
573 };
574 let sql = stmt.to_string();
575 let Statement::CreateView {
576 name,
577 columns,
578 query,
579 or_replace,
580 temporary,
581 ..
582 } = stmt
583 else {
584 return internal_err!("Unreachable code in create view");
585 };
586
587 let columns = columns
588 .into_iter()
589 .map(|view_column_def| {
590 if let Some(options) = view_column_def.options {
591 plan_err!(
592 "Options not supported for view columns: {options:?}"
593 )
594 } else {
595 Ok(view_column_def.name)
596 }
597 })
598 .collect::<Result<Vec<_>>>()?;
599
600 let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
601 plan = self.apply_expr_alias(plan, columns)?;
602
603 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
604 name: self.object_name_to_table_reference(name)?,
605 input: Arc::new(plan),
606 or_replace,
607 definition: Some(sql),
608 temporary,
609 })))
610 }
611 Statement::ShowCreate { obj_type, obj_name } => match obj_type {
612 ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
613 _ => {
614 not_impl_err!("Only `SHOW CREATE TABLE ...` statement is supported")
615 }
616 },
617 Statement::CreateSchema {
618 schema_name,
619 if_not_exists,
620 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
621 CreateCatalogSchema {
622 schema_name: get_schema_name(&schema_name),
623 if_not_exists,
624 schema: Arc::new(DFSchema::empty()),
625 },
626 ))),
627 Statement::CreateDatabase {
628 db_name,
629 if_not_exists,
630 ..
631 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
632 CreateCatalog {
633 catalog_name: object_name_to_string(&db_name),
634 if_not_exists,
635 schema: Arc::new(DFSchema::empty()),
636 },
637 ))),
638 Statement::Drop {
639 object_type,
640 if_exists,
641 mut names,
642 cascade,
643 restrict: _,
644 purge: _,
645 temporary: _,
646 } => {
647 let name = match names.len() {
650 0 => Err(ParserError("Missing table name.".to_string()).into()),
651 1 => self.object_name_to_table_reference(names.pop().unwrap()),
652 _ => {
653 Err(ParserError("Multiple objects not supported".to_string())
654 .into())
655 }
656 }?;
657
658 match object_type {
659 ObjectType::Table => {
660 Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
661 name,
662 if_exists,
663 schema: DFSchemaRef::new(DFSchema::empty()),
664 })))
665 }
666 ObjectType::View => {
667 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
668 name,
669 if_exists,
670 schema: DFSchemaRef::new(DFSchema::empty()),
671 })))
672 }
673 ObjectType::Schema => {
674 let name = match name {
675 TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
676 TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
677 TableReference::Full { catalog: _, schema: _, table: _ } => {
678 Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
679 }
680 }?;
681 Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
682 name,
683 if_exists,
684 cascade,
685 schema: DFSchemaRef::new(DFSchema::empty()),
686 })))
687 }
688 _ => not_impl_err!(
689 "Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
690 ),
691 }
692 }
693 Statement::Prepare {
694 name,
695 data_types,
696 statement,
697 } => {
698 let data_types: Vec<DataType> = data_types
700 .into_iter()
701 .map(|t| self.convert_data_type(&t))
702 .collect::<Result<_>>()?;
703
704 let mut planner_context = PlannerContext::new()
706 .with_prepare_param_data_types(data_types.clone());
707
708 let plan = self.sql_statement_to_plan_with_context_impl(
710 *statement,
711 &mut planner_context,
712 )?;
713 Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
714 name: ident_to_string(&name),
715 data_types,
716 input: Arc::new(plan),
717 })))
718 }
719 Statement::Execute {
720 name,
721 parameters,
722 using,
723 has_parentheses: _,
726 immediate,
727 into,
728 } => {
729 if !using.is_empty() {
731 return not_impl_err!(
732 "Execute statement with USING is not supported"
733 );
734 }
735 if immediate {
736 return not_impl_err!(
737 "Execute statement with IMMEDIATE is not supported"
738 );
739 }
740 if !into.is_empty() {
741 return not_impl_err!("Execute statement with INTO is not supported");
742 }
743 let empty_schema = DFSchema::empty();
744 let parameters = parameters
745 .into_iter()
746 .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
747 .collect::<Result<Vec<Expr>>>()?;
748
749 Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
750 name: object_name_to_string(&name.unwrap()),
751 parameters,
752 })))
753 }
754 Statement::Deallocate {
755 name,
756 prepare: _,
758 } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
759 Deallocate {
760 name: ident_to_string(&name),
761 },
762 ))),
763
764 Statement::ShowTables {
765 extended,
766 full,
767 terse,
768 history,
769 external,
770 show_options,
771 } => {
772 if extended {
775 return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
776 }
777 if full {
778 return not_impl_err!("SHOW FULL TABLES not supported")?;
779 }
780 if terse {
781 return not_impl_err!("SHOW TERSE TABLES not supported")?;
782 }
783 if history {
784 return not_impl_err!("SHOW TABLES HISTORY not supported")?;
785 }
786 if external {
787 return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
788 }
789 let ShowStatementOptions {
790 show_in,
791 starts_with,
792 limit,
793 limit_from,
794 filter_position,
795 } = show_options;
796 if show_in.is_some() {
797 return not_impl_err!("SHOW TABLES IN not supported")?;
798 }
799 if starts_with.is_some() {
800 return not_impl_err!("SHOW TABLES LIKE not supported")?;
801 }
802 if limit.is_some() {
803 return not_impl_err!("SHOW TABLES LIMIT not supported")?;
804 }
805 if limit_from.is_some() {
806 return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
807 }
808 if filter_position.is_some() {
809 return not_impl_err!("SHOW TABLES FILTER not supported")?;
810 }
811 self.show_tables_to_plan()
812 }
813
814 Statement::ShowColumns {
815 extended,
816 full,
817 show_options,
818 } => {
819 let ShowStatementOptions {
820 show_in,
821 starts_with,
822 limit,
823 limit_from,
824 filter_position,
825 } = show_options;
826 if starts_with.is_some() {
827 return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
828 }
829 if limit.is_some() {
830 return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
831 }
832 if limit_from.is_some() {
833 return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
834 }
835 if filter_position.is_some() {
836 return not_impl_err!(
837 "SHOW COLUMNS with WHERE or LIKE is not supported"
838 )?;
839 }
840 let Some(ShowStatementIn {
841 clause: _,
844 parent_type,
845 parent_name,
846 }) = show_in
847 else {
848 return plan_err!("SHOW COLUMNS requires a table name");
849 };
850
851 if let Some(parent_type) = parent_type {
852 return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
853 }
854 let Some(table_name) = parent_name else {
855 return plan_err!("SHOW COLUMNS requires a table name");
856 };
857
858 self.show_columns_to_plan(extended, full, table_name)
859 }
860
861 Statement::ShowFunctions { filter, .. } => {
862 self.show_functions_to_plan(filter)
863 }
864
865 Statement::Insert(Insert {
866 or,
867 into,
868 columns,
869 overwrite,
870 source,
871 partitioned,
872 after_columns,
873 table,
874 on,
875 returning,
876 ignore,
877 table_alias,
878 mut replace_into,
879 priority,
880 insert_alias,
881 assignments,
882 has_table_keyword,
883 settings,
884 format_clause,
885 }) => {
886 let table_name = match table {
887 TableObject::TableName(table_name) => table_name,
888 TableObject::TableFunction(_) => {
889 return not_impl_err!("INSERT INTO Table functions not supported")
890 }
891 };
892 if let Some(or) = or {
893 match or {
894 SqliteOnConflict::Replace => replace_into = true,
895 _ => plan_err!("Inserts with {or} clause is not supported")?,
896 }
897 }
898 if partitioned.is_some() {
899 plan_err!("Partitioned inserts not yet supported")?;
900 }
901 if !after_columns.is_empty() {
902 plan_err!("After-columns clause not supported")?;
903 }
904 if on.is_some() {
905 plan_err!("Insert-on clause not supported")?;
906 }
907 if returning.is_some() {
908 plan_err!("Insert-returning clause not supported")?;
909 }
910 if ignore {
911 plan_err!("Insert-ignore clause not supported")?;
912 }
913 let Some(source) = source else {
914 plan_err!("Inserts without a source not supported")?
915 };
916 if let Some(table_alias) = table_alias {
917 plan_err!(
918 "Inserts with a table alias not supported: {table_alias:?}"
919 )?
920 };
921 if let Some(priority) = priority {
922 plan_err!(
923 "Inserts with a `PRIORITY` clause not supported: {priority:?}"
924 )?
925 };
926 if insert_alias.is_some() {
927 plan_err!("Inserts with an alias not supported")?;
928 }
929 if !assignments.is_empty() {
930 plan_err!("Inserts with assignments not supported")?;
931 }
932 if settings.is_some() {
933 plan_err!("Inserts with settings not supported")?;
934 }
935 if format_clause.is_some() {
936 plan_err!("Inserts with format clause not supported")?;
937 }
938 let _ = into;
940 let _ = has_table_keyword;
941 self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
942 }
943 Statement::Update {
944 table,
945 assignments,
946 from,
947 selection,
948 returning,
949 or,
950 } => {
951 let froms =
952 from.map(|update_table_from_kind| match update_table_from_kind {
953 UpdateTableFromKind::BeforeSet(froms) => froms,
954 UpdateTableFromKind::AfterSet(froms) => froms,
955 });
956 if froms.as_ref().is_some_and(|f| f.len() > 1) {
958 plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
959 }
960 let update_from = froms.and_then(|mut f| f.pop());
961 if returning.is_some() {
962 plan_err!("Update-returning clause not yet supported")?;
963 }
964 if or.is_some() {
965 plan_err!("ON conflict not supported")?;
966 }
967 self.update_to_plan(table, assignments, update_from, selection)
968 }
969
970 Statement::Delete(Delete {
971 tables,
972 using,
973 selection,
974 returning,
975 from,
976 order_by,
977 limit,
978 }) => {
979 if !tables.is_empty() {
980 plan_err!("DELETE <TABLE> not supported")?;
981 }
982
983 if using.is_some() {
984 plan_err!("Using clause not supported")?;
985 }
986
987 if returning.is_some() {
988 plan_err!("Delete-returning clause not yet supported")?;
989 }
990
991 if !order_by.is_empty() {
992 plan_err!("Delete-order-by clause not yet supported")?;
993 }
994
995 if limit.is_some() {
996 plan_err!("Delete-limit clause not yet supported")?;
997 }
998
999 let table_name = self.get_delete_target(from)?;
1000 self.delete_to_plan(table_name, selection)
1001 }
1002
1003 Statement::StartTransaction {
1004 modes,
1005 begin: false,
1006 modifier,
1007 transaction,
1008 statements,
1009 exception_statements,
1010 has_end_keyword,
1011 } => {
1012 if let Some(modifier) = modifier {
1013 return not_impl_err!(
1014 "Transaction modifier not supported: {modifier}"
1015 );
1016 }
1017 if !statements.is_empty() {
1018 return not_impl_err!(
1019 "Transaction with multiple statements not supported"
1020 );
1021 }
1022 if exception_statements.is_some() {
1023 return not_impl_err!(
1024 "Transaction with exception statements not supported"
1025 );
1026 }
1027 if has_end_keyword {
1028 return not_impl_err!("Transaction with END keyword not supported");
1029 }
1030 self.validate_transaction_kind(transaction)?;
1031 let isolation_level: ast::TransactionIsolationLevel = modes
1032 .iter()
1033 .filter_map(|m: &TransactionMode| match m {
1034 TransactionMode::AccessMode(_) => None,
1035 TransactionMode::IsolationLevel(level) => Some(level),
1036 })
1037 .next_back()
1038 .copied()
1039 .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1040 let access_mode: ast::TransactionAccessMode = modes
1041 .iter()
1042 .filter_map(|m: &TransactionMode| match m {
1043 TransactionMode::AccessMode(mode) => Some(mode),
1044 TransactionMode::IsolationLevel(_) => None,
1045 })
1046 .next_back()
1047 .copied()
1048 .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1049 let isolation_level = match isolation_level {
1050 ast::TransactionIsolationLevel::ReadUncommitted => {
1051 TransactionIsolationLevel::ReadUncommitted
1052 }
1053 ast::TransactionIsolationLevel::ReadCommitted => {
1054 TransactionIsolationLevel::ReadCommitted
1055 }
1056 ast::TransactionIsolationLevel::RepeatableRead => {
1057 TransactionIsolationLevel::RepeatableRead
1058 }
1059 ast::TransactionIsolationLevel::Serializable => {
1060 TransactionIsolationLevel::Serializable
1061 }
1062 ast::TransactionIsolationLevel::Snapshot => {
1063 TransactionIsolationLevel::Snapshot
1064 }
1065 };
1066 let access_mode = match access_mode {
1067 ast::TransactionAccessMode::ReadOnly => {
1068 TransactionAccessMode::ReadOnly
1069 }
1070 ast::TransactionAccessMode::ReadWrite => {
1071 TransactionAccessMode::ReadWrite
1072 }
1073 };
1074 let statement = PlanStatement::TransactionStart(TransactionStart {
1075 access_mode,
1076 isolation_level,
1077 });
1078 Ok(LogicalPlan::Statement(statement))
1079 }
1080 Statement::Commit {
1081 chain,
1082 end,
1083 modifier,
1084 } => {
1085 if end {
1086 return not_impl_err!("COMMIT AND END not supported");
1087 };
1088 if let Some(modifier) = modifier {
1089 return not_impl_err!("COMMIT {modifier} not supported");
1090 };
1091 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1092 conclusion: TransactionConclusion::Commit,
1093 chain,
1094 });
1095 Ok(LogicalPlan::Statement(statement))
1096 }
1097 Statement::Rollback { chain, savepoint } => {
1098 if savepoint.is_some() {
1099 plan_err!("Savepoints not supported")?;
1100 }
1101 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1102 conclusion: TransactionConclusion::Rollback,
1103 chain,
1104 });
1105 Ok(LogicalPlan::Statement(statement))
1106 }
1107 Statement::CreateFunction(ast::CreateFunction {
1108 or_replace,
1109 temporary,
1110 name,
1111 args,
1112 return_type,
1113 function_body,
1114 behavior,
1115 language,
1116 ..
1117 }) => {
1118 let return_type = match return_type {
1119 Some(t) => Some(self.convert_data_type(&t)?),
1120 None => None,
1121 };
1122 let mut planner_context = PlannerContext::new();
1123 let empty_schema = &DFSchema::empty();
1124
1125 let args = match args {
1126 Some(function_args) => {
1127 let function_args = function_args
1128 .into_iter()
1129 .map(|arg| {
1130 let data_type = self.convert_data_type(&arg.data_type)?;
1131
1132 let default_expr = match arg.default_expr {
1133 Some(expr) => Some(self.sql_to_expr(
1134 expr,
1135 empty_schema,
1136 &mut planner_context,
1137 )?),
1138 None => None,
1139 };
1140 Ok(OperateFunctionArg {
1141 name: arg.name,
1142 default_expr,
1143 data_type,
1144 })
1145 })
1146 .collect::<Result<Vec<OperateFunctionArg>>>();
1147 Some(function_args?)
1148 }
1149 None => None,
1150 };
1151 let name = match &name.0[..] {
1153 [] => exec_err!("Function should have name")?,
1154 [n] => n.as_ident().unwrap().value.clone(),
1155 [..] => not_impl_err!("Qualified functions are not supported")?,
1156 };
1157 let arg_types = args.as_ref().map(|arg| {
1161 arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
1162 });
1163 let mut planner_context = PlannerContext::new()
1164 .with_prepare_param_data_types(arg_types.unwrap_or_default());
1165
1166 let function_body = match function_body {
1167 Some(r) => Some(self.sql_to_expr(
1168 match r {
1169 ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1170 ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1171 ast::CreateFunctionBody::Return(expr) => expr,
1172 },
1173 &DFSchema::empty(),
1174 &mut planner_context,
1175 )?),
1176 None => None,
1177 };
1178
1179 let params = CreateFunctionBody {
1180 language,
1181 behavior: behavior.map(|b| match b {
1182 ast::FunctionBehavior::Immutable => Volatility::Immutable,
1183 ast::FunctionBehavior::Stable => Volatility::Stable,
1184 ast::FunctionBehavior::Volatile => Volatility::Volatile,
1185 }),
1186 function_body,
1187 };
1188
1189 let statement = DdlStatement::CreateFunction(CreateFunction {
1190 or_replace,
1191 temporary,
1192 name,
1193 return_type,
1194 args,
1195 params,
1196 schema: DFSchemaRef::new(DFSchema::empty()),
1197 });
1198
1199 Ok(LogicalPlan::Ddl(statement))
1200 }
1201 Statement::DropFunction {
1202 if_exists,
1203 func_desc,
1204 ..
1205 } => {
1206 if let Some(desc) = func_desc.first() {
1209 let name = match &desc.name.0[..] {
1211 [] => exec_err!("Function should have name")?,
1212 [n] => n.as_ident().unwrap().value.clone(),
1213 [..] => not_impl_err!("Qualified functions are not supported")?,
1214 };
1215 let statement = DdlStatement::DropFunction(DropFunction {
1216 if_exists,
1217 name,
1218 schema: DFSchemaRef::new(DFSchema::empty()),
1219 });
1220 Ok(LogicalPlan::Ddl(statement))
1221 } else {
1222 exec_err!("Function name not provided")
1223 }
1224 }
1225 Statement::CreateIndex(CreateIndex {
1226 name,
1227 table_name,
1228 using,
1229 columns,
1230 unique,
1231 if_not_exists,
1232 ..
1233 }) => {
1234 let name: Option<String> = name.as_ref().map(object_name_to_string);
1235 let table = self.object_name_to_table_reference(table_name)?;
1236 let table_schema = self
1237 .context_provider
1238 .get_table_source(table.clone())?
1239 .schema()
1240 .to_dfschema_ref()?;
1241 let using: Option<String> = using.as_ref().map(ident_to_string);
1242 let columns = self.order_by_to_sort_expr(
1243 columns,
1244 &table_schema,
1245 planner_context,
1246 false,
1247 None,
1248 )?;
1249 Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1250 PlanCreateIndex {
1251 name,
1252 table,
1253 using,
1254 columns,
1255 unique,
1256 if_not_exists,
1257 schema: DFSchemaRef::new(DFSchema::empty()),
1258 },
1259 )))
1260 }
1261 stmt => {
1262 not_impl_err!("Unsupported SQL statement: {stmt}")
1263 }
1264 }
1265 }
1266
1267 fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1268 let mut from = match from {
1269 FromTable::WithFromKeyword(v) => v,
1270 FromTable::WithoutKeyword(v) => v,
1271 };
1272
1273 if from.len() != 1 {
1274 return not_impl_err!(
1275 "DELETE FROM only supports single table, got {}: {from:?}",
1276 from.len()
1277 );
1278 }
1279 let table_factor = from.pop().unwrap();
1280 if !table_factor.joins.is_empty() {
1281 return not_impl_err!("DELETE FROM only supports single table, got: joins");
1282 }
1283 let TableFactor::Table { name, .. } = table_factor.relation else {
1284 return not_impl_err!(
1285 "DELETE FROM only supports single table, got: {table_factor:?}"
1286 );
1287 };
1288
1289 Ok(name)
1290 }
1291
1292 fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1294 if self.has_table("information_schema", "tables") {
1295 let query = "SELECT * FROM information_schema.tables;";
1296 let mut rewrite = DFParser::parse_sql(query)?;
1297 assert_eq!(rewrite.len(), 1);
1298 self.statement_to_plan(rewrite.pop_front().unwrap()) } else {
1300 plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1301 }
1302 }
1303
1304 fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1305 let table_ref = self.object_name_to_table_reference(table_name)?;
1306
1307 let table_source = self.context_provider.get_table_source(table_ref)?;
1308
1309 let schema = table_source.schema();
1310
1311 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1312
1313 Ok(LogicalPlan::DescribeTable(DescribeTable {
1314 schema,
1315 output_schema: Arc::new(output_schema),
1316 }))
1317 }
1318
1319 fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1320 let copy_source = statement.source;
1322 let (input, input_schema, table_ref) = match copy_source {
1323 CopyToSource::Relation(object_name) => {
1324 let table_name = object_name_to_string(&object_name);
1325 let table_ref = self.object_name_to_table_reference(object_name)?;
1326 let table_source =
1327 self.context_provider.get_table_source(table_ref.clone())?;
1328 let plan =
1329 LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1330 let input_schema = Arc::clone(plan.schema());
1331 (plan, input_schema, Some(table_ref))
1332 }
1333 CopyToSource::Query(query) => {
1334 let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1335 let input_schema = Arc::clone(plan.schema());
1336 (plan, input_schema, None)
1337 }
1338 };
1339
1340 let options_map = self.parse_options_map(statement.options, true)?;
1341
1342 let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1343 self.context_provider.get_file_type(stored_as).ok()
1344 } else {
1345 None
1346 };
1347
1348 let file_type = match maybe_file_type {
1349 Some(ft) => ft,
1350 None => {
1351 let e = || {
1352 DataFusionError::Configuration(
1353 "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1354 .to_string(),
1355 )
1356 };
1357 let extension: &str = &Path::new(&statement.target)
1359 .extension()
1360 .ok_or_else(e)?
1361 .to_str()
1362 .ok_or_else(e)?
1363 .to_lowercase();
1364
1365 self.context_provider.get_file_type(extension)?
1366 }
1367 };
1368
1369 let partition_by = statement
1370 .partitioned_by
1371 .iter()
1372 .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1373 .collect::<Result<Vec<_>>>()?
1374 .into_iter()
1375 .map(|f| f.name().to_owned())
1376 .collect();
1377
1378 Ok(LogicalPlan::Copy(CopyTo {
1379 input: Arc::new(input),
1380 output_url: statement.target,
1381 file_type,
1382 partition_by,
1383 options: options_map,
1384 }))
1385 }
1386
1387 fn build_order_by(
1388 &self,
1389 order_exprs: Vec<LexOrdering>,
1390 schema: &DFSchemaRef,
1391 planner_context: &mut PlannerContext,
1392 ) -> Result<Vec<Vec<SortExpr>>> {
1393 if !order_exprs.is_empty() && schema.fields().is_empty() {
1394 let results = order_exprs
1395 .iter()
1396 .map(|lex_order| {
1397 let result = lex_order
1398 .iter()
1399 .map(|order_by_expr| {
1400 let ordered_expr = &order_by_expr.expr;
1401 let ordered_expr = ordered_expr.to_owned();
1402 let ordered_expr = self
1403 .sql_expr_to_logical_expr(
1404 ordered_expr,
1405 schema,
1406 planner_context,
1407 )
1408 .unwrap();
1409 let asc = order_by_expr.options.asc.unwrap_or(true);
1410 let nulls_first =
1411 order_by_expr.options.nulls_first.unwrap_or(!asc);
1412
1413 SortExpr::new(ordered_expr, asc, nulls_first)
1414 })
1415 .collect::<Vec<SortExpr>>();
1416 result
1417 })
1418 .collect::<Vec<Vec<SortExpr>>>();
1419
1420 return Ok(results);
1421 }
1422
1423 let mut all_results = vec![];
1424 for expr in order_exprs {
1425 let expr_vec =
1427 self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1428 for sort in expr_vec.iter() {
1430 for column in sort.expr.column_refs().iter() {
1431 if !schema.has_column(column) {
1432 return plan_err!("Column {column} is not in schema");
1434 }
1435 }
1436 }
1437 all_results.push(expr_vec)
1439 }
1440 Ok(all_results)
1441 }
1442
1443 fn external_table_to_plan(
1445 &self,
1446 statement: CreateExternalTable,
1447 ) -> Result<LogicalPlan> {
1448 let definition = Some(statement.to_string());
1449 let CreateExternalTable {
1450 name,
1451 columns,
1452 file_type,
1453 location,
1454 table_partition_cols,
1455 if_not_exists,
1456 temporary,
1457 order_exprs,
1458 unbounded,
1459 options,
1460 constraints,
1461 } = statement;
1462
1463 let mut all_constraints = constraints;
1465 let inline_constraints = calc_inline_constraints_from_columns(&columns);
1466 all_constraints.extend(inline_constraints);
1467
1468 let options_map = self.parse_options_map(options, false)?;
1469
1470 let compression = options_map
1471 .get("format.compression")
1472 .map(|c| CompressionTypeVariant::from_str(c))
1473 .transpose()?;
1474 if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1475 && compression
1476 .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1477 .unwrap_or(false)
1478 {
1479 plan_err!(
1480 "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1481 )?;
1482 }
1483
1484 let mut planner_context = PlannerContext::new();
1485
1486 let column_defaults = self
1487 .build_column_defaults(&columns, &mut planner_context)?
1488 .into_iter()
1489 .collect();
1490
1491 let schema = self.build_schema(columns)?;
1492 let df_schema = schema.to_dfschema_ref()?;
1493 df_schema.check_names()?;
1494
1495 let ordered_exprs =
1496 self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1497
1498 let name = self.object_name_to_table_reference(name)?;
1499 let constraints =
1500 self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1501 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1502 PlanCreateExternalTable {
1503 schema: df_schema,
1504 name,
1505 location,
1506 file_type,
1507 table_partition_cols,
1508 if_not_exists,
1509 temporary,
1510 definition,
1511 order_exprs: ordered_exprs,
1512 unbounded,
1513 options: options_map,
1514 constraints,
1515 column_defaults,
1516 },
1517 )))
1518 }
1519
1520 fn get_constraint_column_indices(
1523 &self,
1524 df_schema: &DFSchemaRef,
1525 columns: &[Ident],
1526 constraint_name: &str,
1527 ) -> Result<Vec<usize>> {
1528 let field_names = df_schema.field_names();
1529 columns
1530 .iter()
1531 .map(|ident| {
1532 let column = self.ident_normalizer.normalize(ident.clone());
1533 field_names
1534 .iter()
1535 .position(|item| *item == column)
1536 .ok_or_else(|| {
1537 plan_datafusion_err!(
1538 "Column for {constraint_name} not found in schema: {column}"
1539 )
1540 })
1541 })
1542 .collect::<Result<Vec<_>>>()
1543 }
1544
1545 pub fn new_constraint_from_table_constraints(
1547 &self,
1548 constraints: &[TableConstraint],
1549 df_schema: &DFSchemaRef,
1550 ) -> Result<Constraints> {
1551 let constraints = constraints
1552 .iter()
1553 .map(|c: &TableConstraint| match c {
1554 TableConstraint::Unique { name, columns, .. } => {
1555 let constraint_name = match name {
1556 Some(name) => &format!("unique constraint with name '{name}'"),
1557 None => "unique constraint",
1558 };
1559 let indices = self.get_constraint_column_indices(
1561 df_schema,
1562 columns,
1563 constraint_name,
1564 )?;
1565 Ok(Constraint::Unique(indices))
1566 }
1567 TableConstraint::PrimaryKey { columns, .. } => {
1568 let indices = self.get_constraint_column_indices(
1570 df_schema,
1571 columns,
1572 "primary key",
1573 )?;
1574 Ok(Constraint::PrimaryKey(indices))
1575 }
1576 TableConstraint::ForeignKey { .. } => {
1577 _plan_err!("Foreign key constraints are not currently supported")
1578 }
1579 TableConstraint::Check { .. } => {
1580 _plan_err!("Check constraints are not currently supported")
1581 }
1582 TableConstraint::Index { .. } => {
1583 _plan_err!("Indexes are not currently supported")
1584 }
1585 TableConstraint::FulltextOrSpatial { .. } => {
1586 _plan_err!("Indexes are not currently supported")
1587 }
1588 })
1589 .collect::<Result<Vec<_>>>()?;
1590 Ok(Constraints::new_unverified(constraints))
1591 }
1592
1593 fn parse_options_map(
1594 &self,
1595 options: Vec<(String, Value)>,
1596 allow_duplicates: bool,
1597 ) -> Result<HashMap<String, String>> {
1598 let mut options_map = HashMap::new();
1599 for (key, value) in options {
1600 if !allow_duplicates && options_map.contains_key(&key) {
1601 return plan_err!("Option {key} is specified multiple times");
1602 }
1603
1604 let Some(value_string) = crate::utils::value_to_string(&value) else {
1605 return plan_err!("Unsupported Value {}", value);
1606 };
1607
1608 if !(&key.contains('.')) {
1609 let renamed_key = format!("format.{}", key);
1613 options_map.insert(renamed_key.to_lowercase(), value_string);
1614 } else {
1615 options_map.insert(key.to_lowercase(), value_string);
1616 }
1617 }
1618
1619 Ok(options_map)
1620 }
1621
1622 fn explain_to_plan(
1627 &self,
1628 verbose: bool,
1629 analyze: bool,
1630 format: Option<String>,
1631 statement: DFStatement,
1632 ) -> Result<LogicalPlan> {
1633 let plan = self.statement_to_plan(statement)?;
1634 if matches!(plan, LogicalPlan::Explain(_)) {
1635 return plan_err!("Nested EXPLAINs are not supported");
1636 }
1637
1638 let plan = Arc::new(plan);
1639 let schema = LogicalPlan::explain_schema();
1640 let schema = schema.to_dfschema_ref()?;
1641
1642 if verbose && format.is_some() {
1643 return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1644 }
1645
1646 if analyze {
1647 if format.is_some() {
1648 return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1649 }
1650 Ok(LogicalPlan::Analyze(Analyze {
1651 verbose,
1652 input: plan,
1653 schema,
1654 }))
1655 } else {
1656 let stringified_plans =
1657 vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1658
1659 let options = self.context_provider.options();
1661 let format = format.as_ref().unwrap_or(&options.explain.format);
1662
1663 let format: ExplainFormat = format.parse()?;
1664
1665 Ok(LogicalPlan::Explain(Explain {
1666 verbose,
1667 explain_format: format,
1668 plan,
1669 stringified_plans,
1670 schema,
1671 logical_optimization_succeeded: false,
1672 }))
1673 }
1674 }
1675
1676 fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1677 if !self.has_table("information_schema", "df_settings") {
1678 return plan_err!(
1679 "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1680 );
1681 }
1682
1683 let verbose = variable
1684 .last()
1685 .map(|s| ident_to_string(s) == "verbose")
1686 .unwrap_or(false);
1687 let mut variable_vec = variable.to_vec();
1688 let mut columns: String = "name, value".to_owned();
1689
1690 if verbose {
1691 columns = format!("{columns}, description");
1692 variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1693 }
1694
1695 let variable = object_name_to_string(&ObjectName::from(variable_vec));
1696 let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1697 let query = if variable == "all" {
1698 format!("{base_query} ORDER BY name")
1700 } else if variable == "timezone" || variable == "time.zone" {
1701 format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1703 } else {
1704 let is_valid_variable = self
1708 .context_provider
1709 .options()
1710 .entries()
1711 .iter()
1712 .any(|opt| opt.key == variable);
1713
1714 if !is_valid_variable {
1715 return plan_err!(
1716 "'{variable}' is not a variable which can be viewed with 'SHOW'"
1717 );
1718 }
1719
1720 format!("{base_query} WHERE name = '{variable}'")
1721 };
1722
1723 let mut rewrite = DFParser::parse_sql(&query)?;
1724 assert_eq!(rewrite.len(), 1);
1725
1726 self.statement_to_plan(rewrite.pop_front().unwrap())
1727 }
1728
1729 fn set_variable_to_plan(
1730 &self,
1731 local: bool,
1732 hivevar: bool,
1733 variables: &OneOrManyWithParens<ObjectName>,
1734 value: Vec<SQLExpr>,
1735 ) -> Result<LogicalPlan> {
1736 if local {
1737 return not_impl_err!("LOCAL is not supported");
1738 }
1739
1740 if hivevar {
1741 return not_impl_err!("HIVEVAR is not supported");
1742 }
1743
1744 let variable = match variables {
1745 OneOrManyWithParens::One(v) => object_name_to_string(v),
1746 OneOrManyWithParens::Many(vs) => {
1747 return not_impl_err!(
1748 "SET only supports single variable assignment: {vs:?}"
1749 );
1750 }
1751 };
1752 let mut variable_lower = variable.to_lowercase();
1753
1754 if variable_lower == "timezone" || variable_lower == "time.zone" {
1755 variable_lower = "datafusion.execution.time_zone".to_string();
1757 }
1758
1759 let value_string = match &value[0] {
1761 SQLExpr::Identifier(i) => ident_to_string(i),
1762 SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
1763 None => {
1764 return plan_err!("Unsupported Value {}", value[0]);
1765 }
1766 Some(v) => v,
1767 },
1768 SQLExpr::UnaryOp { op, expr } => match op {
1770 UnaryOperator::Plus => format!("+{expr}"),
1771 UnaryOperator::Minus => format!("-{expr}"),
1772 _ => {
1773 return plan_err!("Unsupported Value {}", value[0]);
1774 }
1775 },
1776 _ => {
1777 return plan_err!("Unsupported Value {}", value[0]);
1778 }
1779 };
1780
1781 let statement = PlanStatement::SetVariable(SetVariable {
1782 variable: variable_lower,
1783 value: value_string,
1784 });
1785
1786 Ok(LogicalPlan::Statement(statement))
1787 }
1788
1789 fn delete_to_plan(
1790 &self,
1791 table_name: ObjectName,
1792 predicate_expr: Option<SQLExpr>,
1793 ) -> Result<LogicalPlan> {
1794 let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1796 let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1797 let schema = table_source.schema().to_dfschema_ref()?;
1798 let scan =
1799 LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1800 .build()?;
1801 let mut planner_context = PlannerContext::new();
1802
1803 let source = match predicate_expr {
1804 None => scan,
1805 Some(predicate_expr) => {
1806 let filter_expr =
1807 self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1808 let schema = Arc::new(schema);
1809 let mut using_columns = HashSet::new();
1810 expr_to_columns(&filter_expr, &mut using_columns)?;
1811 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1812 filter_expr,
1813 &[&[&schema]],
1814 &[using_columns],
1815 )?;
1816 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1817 }
1818 };
1819
1820 let plan = LogicalPlan::Dml(DmlStatement::new(
1821 table_ref,
1822 table_source,
1823 WriteOp::Delete,
1824 Arc::new(source),
1825 ));
1826 Ok(plan)
1827 }
1828
1829 fn update_to_plan(
1830 &self,
1831 table: TableWithJoins,
1832 assignments: Vec<Assignment>,
1833 from: Option<TableWithJoins>,
1834 predicate_expr: Option<SQLExpr>,
1835 ) -> Result<LogicalPlan> {
1836 let (table_name, table_alias) = match &table.relation {
1837 TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
1838 _ => plan_err!("Cannot update non-table relation!")?,
1839 };
1840
1841 let table_name = self.object_name_to_table_reference(table_name)?;
1843 let table_source = self.context_provider.get_table_source(table_name.clone())?;
1844 let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
1845 table_name.clone(),
1846 &table_source.schema(),
1847 )?);
1848
1849 let mut planner_context = PlannerContext::new();
1851 let mut assign_map = assignments
1852 .iter()
1853 .map(|assign| {
1854 let cols = match &assign.target {
1855 AssignmentTarget::ColumnName(cols) => cols,
1856 _ => plan_err!("Tuples are not supported")?,
1857 };
1858 let col_name: &Ident = cols
1859 .0
1860 .iter()
1861 .last()
1862 .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
1863 .as_ident()
1864 .unwrap();
1865 table_schema.field_with_unqualified_name(&col_name.value)?;
1867 Ok((col_name.value.clone(), assign.value.clone()))
1868 })
1869 .collect::<Result<HashMap<String, SQLExpr>>>()?;
1870
1871 let mut input_tables = vec![table];
1873 input_tables.extend(from);
1874 let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
1875
1876 let source = match predicate_expr {
1878 None => scan,
1879 Some(predicate_expr) => {
1880 let filter_expr = self.sql_to_expr(
1881 predicate_expr,
1882 scan.schema(),
1883 &mut planner_context,
1884 )?;
1885 let mut using_columns = HashSet::new();
1886 expr_to_columns(&filter_expr, &mut using_columns)?;
1887 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1888 filter_expr,
1889 &[&[scan.schema()]],
1890 &[using_columns],
1891 )?;
1892 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1893 }
1894 };
1895
1896 let exprs = table_schema
1898 .iter()
1899 .map(|(qualifier, field)| {
1900 let expr = match assign_map.remove(field.name()) {
1901 Some(new_value) => {
1902 let mut expr = self.sql_to_expr(
1903 new_value,
1904 source.schema(),
1905 &mut planner_context,
1906 )?;
1907 if let Expr::Placeholder(placeholder) = &mut expr {
1909 placeholder.data_type = placeholder
1910 .data_type
1911 .take()
1912 .or_else(|| Some(field.data_type().clone()));
1913 }
1914 expr.cast_to(field.data_type(), source.schema())?
1916 }
1917 None => {
1918 if let Some(alias) = &table_alias {
1920 Expr::Column(Column::new(
1921 Some(self.ident_normalizer.normalize(alias.name.clone())),
1922 field.name(),
1923 ))
1924 } else {
1925 Expr::Column(Column::from((qualifier, field)))
1926 }
1927 }
1928 };
1929 Ok(expr.alias(field.name()))
1930 })
1931 .collect::<Result<Vec<_>>>()?;
1932
1933 let source = project(source, exprs)?;
1934
1935 let plan = LogicalPlan::Dml(DmlStatement::new(
1936 table_name,
1937 table_source,
1938 WriteOp::Update,
1939 Arc::new(source),
1940 ));
1941 Ok(plan)
1942 }
1943
1944 fn insert_to_plan(
1945 &self,
1946 table_name: ObjectName,
1947 columns: Vec<Ident>,
1948 source: Box<Query>,
1949 overwrite: bool,
1950 replace_into: bool,
1951 ) -> Result<LogicalPlan> {
1952 let table_name = self.object_name_to_table_reference(table_name)?;
1954 let table_source = self.context_provider.get_table_source(table_name.clone())?;
1955 let arrow_schema = (*table_source.schema()).clone();
1956 let table_schema = DFSchema::try_from(arrow_schema)?;
1957
1958 let (fields, value_indices) = if columns.is_empty() {
1966 (
1968 table_schema.fields().clone(),
1969 (0..table_schema.fields().len())
1970 .map(Some)
1971 .collect::<Vec<_>>(),
1972 )
1973 } else {
1974 let mut value_indices = vec![None; table_schema.fields().len()];
1975 let fields = columns
1976 .into_iter()
1977 .map(|c| self.ident_normalizer.normalize(c))
1978 .enumerate()
1979 .map(|(i, c)| {
1980 let column_index = table_schema
1981 .index_of_column_by_name(None, &c)
1982 .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
1983
1984 if value_indices[column_index].is_some() {
1985 return schema_err!(SchemaError::DuplicateUnqualifiedField {
1986 name: c,
1987 });
1988 } else {
1989 value_indices[column_index] = Some(i);
1990 }
1991 Ok(table_schema.field(column_index).clone())
1992 })
1993 .collect::<Result<Vec<_>>>()?;
1994 (Fields::from(fields), value_indices)
1995 };
1996
1997 let mut prepare_param_data_types = BTreeMap::new();
1999 if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2000 for row in rows.iter() {
2001 for (idx, val) in row.iter().enumerate() {
2002 if let SQLExpr::Value(ValueWithSpan {
2003 value: Value::Placeholder(name),
2004 span: _,
2005 }) = val
2006 {
2007 let name =
2008 name.replace('$', "").parse::<usize>().map_err(|_| {
2009 plan_datafusion_err!("Can't parse placeholder: {name}")
2010 })? - 1;
2011 let field = fields.get(idx).ok_or_else(|| {
2012 plan_datafusion_err!(
2013 "Placeholder ${} refers to a non existent column",
2014 idx + 1
2015 )
2016 })?;
2017 let dt = field.data_type().clone();
2018 let _ = prepare_param_data_types.insert(name, dt);
2019 }
2020 }
2021 }
2022 }
2023 let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2024
2025 let mut planner_context =
2027 PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2028 planner_context.set_table_schema(Some(DFSchemaRef::new(
2029 DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2030 )));
2031 let source = self.query_to_plan(*source, &mut planner_context)?;
2032 if fields.len() != source.schema().fields().len() {
2033 plan_err!("Column count doesn't match insert query!")?;
2034 }
2035
2036 let exprs = value_indices
2037 .into_iter()
2038 .enumerate()
2039 .map(|(i, value_index)| {
2040 let target_field = table_schema.field(i);
2041 let expr = match value_index {
2042 Some(v) => {
2043 Expr::Column(Column::from(source.schema().qualified_field(v)))
2044 .cast_to(target_field.data_type(), source.schema())?
2045 }
2046 None => table_source
2048 .get_column_default(target_field.name())
2049 .cloned()
2050 .unwrap_or_else(|| {
2051 Expr::Literal(ScalarValue::Null)
2053 })
2054 .cast_to(target_field.data_type(), &DFSchema::empty())?,
2055 };
2056 Ok(expr.alias(target_field.name()))
2057 })
2058 .collect::<Result<Vec<Expr>>>()?;
2059 let source = project(source, exprs)?;
2060
2061 let insert_op = match (overwrite, replace_into) {
2062 (false, false) => InsertOp::Append,
2063 (true, false) => InsertOp::Overwrite,
2064 (false, true) => InsertOp::Replace,
2065 (true, true) => plan_err!("Conflicting insert operations: `overwrite` and `replace_into` cannot both be true")?,
2066 };
2067
2068 let plan = LogicalPlan::Dml(DmlStatement::new(
2069 table_name,
2070 Arc::clone(&table_source),
2071 WriteOp::Insert(insert_op),
2072 Arc::new(source),
2073 ));
2074 Ok(plan)
2075 }
2076
2077 fn show_columns_to_plan(
2078 &self,
2079 extended: bool,
2080 full: bool,
2081 sql_table_name: ObjectName,
2082 ) -> Result<LogicalPlan> {
2083 let where_clause = object_name_to_qualifier(
2085 &sql_table_name,
2086 self.options.enable_ident_normalization,
2087 )?;
2088
2089 if !self.has_table("information_schema", "columns") {
2090 return plan_err!(
2091 "SHOW COLUMNS is not supported unless information_schema is enabled"
2092 );
2093 }
2094
2095 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2097 let _ = self.context_provider.get_table_source(table_ref)?;
2098
2099 let select_list = if full || extended {
2101 "*"
2102 } else {
2103 "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2104 };
2105
2106 let query = format!(
2107 "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2108 );
2109
2110 let mut rewrite = DFParser::parse_sql(&query)?;
2111 assert_eq!(rewrite.len(), 1);
2112 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2114
2115 fn show_functions_to_plan(
2126 &self,
2127 filter: Option<ShowStatementFilter>,
2128 ) -> Result<LogicalPlan> {
2129 let where_clause = if let Some(filter) = filter {
2130 match filter {
2131 ShowStatementFilter::Like(like) => {
2132 format!("WHERE p.function_name like '{like}'")
2133 }
2134 _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2135 }
2136 } else {
2137 "".to_string()
2138 };
2139
2140 let query = format!(
2141 r#"
2142SELECT DISTINCT
2143 p.*,
2144 r.function_type function_type,
2145 r.description description,
2146 r.syntax_example syntax_example
2147FROM
2148 (
2149 SELECT
2150 i.specific_name function_name,
2151 o.data_type return_type,
2152 array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2153 array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2154 FROM (
2155 SELECT
2156 specific_catalog,
2157 specific_schema,
2158 specific_name,
2159 ordinal_position,
2160 parameter_name,
2161 data_type,
2162 rid
2163 FROM
2164 information_schema.parameters
2165 WHERE
2166 parameter_mode = 'IN'
2167 ) i
2168 JOIN
2169 (
2170 SELECT
2171 specific_catalog,
2172 specific_schema,
2173 specific_name,
2174 ordinal_position,
2175 parameter_name,
2176 data_type,
2177 rid
2178 FROM
2179 information_schema.parameters
2180 WHERE
2181 parameter_mode = 'OUT'
2182 ) o
2183 ON i.specific_catalog = o.specific_catalog
2184 AND i.specific_schema = o.specific_schema
2185 AND i.specific_name = o.specific_name
2186 AND i.rid = o.rid
2187 GROUP BY 1, 2, i.rid
2188 ) as p
2189JOIN information_schema.routines r
2190ON p.function_name = r.routine_name
2191{where_clause}
2192 "#
2193 );
2194 let mut rewrite = DFParser::parse_sql(&query)?;
2195 assert_eq!(rewrite.len(), 1);
2196 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2198
2199 fn show_create_table_to_plan(
2200 &self,
2201 sql_table_name: ObjectName,
2202 ) -> Result<LogicalPlan> {
2203 if !self.has_table("information_schema", "tables") {
2204 return plan_err!(
2205 "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2206 );
2207 }
2208 let where_clause = object_name_to_qualifier(
2210 &sql_table_name,
2211 self.options.enable_ident_normalization,
2212 )?;
2213
2214 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2216 let _ = self.context_provider.get_table_source(table_ref)?;
2217
2218 let query = format!(
2219 "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2220 );
2221
2222 let mut rewrite = DFParser::parse_sql(&query)?;
2223 assert_eq!(rewrite.len(), 1);
2224 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2226
2227 fn has_table(&self, schema: &str, table: &str) -> bool {
2229 let tables_reference = TableReference::Partial {
2230 schema: schema.into(),
2231 table: table.into(),
2232 };
2233 self.context_provider
2234 .get_table_source(tables_reference)
2235 .is_ok()
2236 }
2237
2238 fn validate_transaction_kind(
2239 &self,
2240 kind: Option<BeginTransactionKind>,
2241 ) -> Result<()> {
2242 match kind {
2243 None => Ok(()),
2245 Some(BeginTransactionKind::Transaction) => Ok(()),
2247 Some(BeginTransactionKind::Work) => {
2248 not_impl_err!("Transaction kind not supported: {kind:?}")
2249 }
2250 }
2251 }
2252}