1use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24 CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25 LexOrdering, ResetStatement, Statement as DFStatement,
26};
27use crate::planner::{
28 ContextProvider, PlannerContext, SqlToRel, object_name_to_qualifier,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{Field, FieldRef, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36 Column, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result,
37 ScalarValue, SchemaError, SchemaReference, TableReference, ToDFSchema, exec_err,
38 internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
39 unqualified_field_not_found,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::DdlStatement;
44use datafusion_expr::logical_plan::builder::project;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47 Analyze, CreateCatalog, CreateCatalogSchema,
48 CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49 CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50 DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51 EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52 LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare,
53 ResetVariable, SetVariable, SortExpr, Statement as PlanStatement, ToStringifiedPlan,
54 TransactionAccessMode, TransactionConclusion, TransactionEnd,
55 TransactionIsolationLevel, TransactionStart, Volatility, WriteOp, cast, col,
56};
57use sqlparser::ast::{
58 self, BeginTransactionKind, CheckConstraint, ForeignKeyConstraint, IndexColumn,
59 IndexType, NullsDistinctOption, OrderByExpr, OrderByOptions, PrimaryKeyConstraint,
60 Set, ShowStatementIn, ShowStatementOptions, SqliteOnConflict, TableObject,
61 UniqueConstraint, Update, UpdateTableFromKind, ValueWithSpan,
62};
63use sqlparser::ast::{
64 Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
65 CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
66 ObjectName, ObjectType, Query, SchemaName, SetExpr, ShowCreateObject,
67 ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
68 TransactionMode, UnaryOperator, Value,
69};
70use sqlparser::parser::ParserError::ParserError;
71
72fn ident_to_string(ident: &Ident) -> String {
73 normalize_ident(ident.to_owned())
74}
75
76fn object_name_to_string(object_name: &ObjectName) -> String {
77 object_name
78 .0
79 .iter()
80 .map(|object_name_part| {
81 object_name_part
82 .as_ident()
83 .map_or_else(String::new, ident_to_string)
86 })
87 .collect::<Vec<String>>()
88 .join(".")
89}
90
91fn get_schema_name(schema_name: &SchemaName) -> String {
92 match schema_name {
93 SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
94 SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
95 SchemaName::NamedAuthorization(schema_name, auth) => format!(
96 "{}.{}",
97 object_name_to_string(schema_name),
98 ident_to_string(auth)
99 ),
100 }
101}
102
103fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
106 let mut constraints: Vec<TableConstraint> = vec![];
107 for column in columns {
108 for ast::ColumnOptionDef { name, option } in &column.options {
109 match option {
110 ast::ColumnOption::Unique(UniqueConstraint {
111 characteristics,
112 name,
113 index_name: _index_name,
114 index_type_display: _index_type_display,
115 index_type: _index_type,
116 columns: _column,
117 index_options: _index_options,
118 nulls_distinct: _nulls_distinct,
119 }) => constraints.push(TableConstraint::Unique(UniqueConstraint {
120 name: name.clone(),
121 index_name: None,
122 index_type_display: ast::KeyOrIndexDisplay::None,
123 index_type: None,
124 columns: vec![IndexColumn {
125 column: OrderByExpr {
126 expr: SQLExpr::Identifier(column.name.clone()),
127 options: OrderByOptions {
128 asc: None,
129 nulls_first: None,
130 },
131 with_fill: None,
132 },
133 operator_class: None,
134 }],
135 index_options: vec![],
136 characteristics: *characteristics,
137 nulls_distinct: NullsDistinctOption::None,
138 })),
139 ast::ColumnOption::PrimaryKey(PrimaryKeyConstraint {
140 characteristics,
141 name: _name,
142 index_name: _index_name,
143 index_type: _index_type,
144 columns: _columns,
145 index_options: _index_options,
146 }) => {
147 constraints.push(TableConstraint::PrimaryKey(PrimaryKeyConstraint {
148 name: name.clone(),
149 index_name: None,
150 index_type: None,
151 columns: vec![IndexColumn {
152 column: OrderByExpr {
153 expr: SQLExpr::Identifier(column.name.clone()),
154 options: OrderByOptions {
155 asc: None,
156 nulls_first: None,
157 },
158 with_fill: None,
159 },
160 operator_class: None,
161 }],
162 index_options: vec![],
163 characteristics: *characteristics,
164 }))
165 }
166 ast::ColumnOption::ForeignKey(ForeignKeyConstraint {
167 foreign_table,
168 referred_columns,
169 on_delete,
170 on_update,
171 characteristics,
172 name: _name,
173 index_name: _index_name,
174 columns: _columns,
175 match_kind: _match_kind,
176 }) => {
177 constraints.push(TableConstraint::ForeignKey(ForeignKeyConstraint {
178 name: name.clone(),
179 index_name: None,
180 columns: vec![],
181 foreign_table: foreign_table.clone(),
182 referred_columns: referred_columns.clone(),
183 on_delete: *on_delete,
184 on_update: *on_update,
185 match_kind: None,
186 characteristics: *characteristics,
187 }))
188 }
189 ast::ColumnOption::Check(CheckConstraint {
190 name,
191 expr,
192 enforced: _enforced,
193 }) => constraints.push(TableConstraint::Check(CheckConstraint {
194 name: name.clone(),
195 expr: expr.clone(),
196 enforced: None,
197 })),
198 ast::ColumnOption::Default(_)
199 | ast::ColumnOption::Null
200 | ast::ColumnOption::NotNull
201 | ast::ColumnOption::DialectSpecific(_)
202 | ast::ColumnOption::CharacterSet(_)
203 | ast::ColumnOption::Generated { .. }
204 | ast::ColumnOption::Comment(_)
205 | ast::ColumnOption::Options(_)
206 | ast::ColumnOption::OnUpdate(_)
207 | ast::ColumnOption::Materialized(_)
208 | ast::ColumnOption::Ephemeral(_)
209 | ast::ColumnOption::Identity(_)
210 | ast::ColumnOption::OnConflict(_)
211 | ast::ColumnOption::Policy(_)
212 | ast::ColumnOption::Tags(_)
213 | ast::ColumnOption::Alias(_)
214 | ast::ColumnOption::Srid(_)
215 | ast::ColumnOption::Collation(_)
216 | ast::ColumnOption::Invisible => {}
217 }
218 }
219 }
220 constraints
221}
222
223impl<S: ContextProvider> SqlToRel<'_, S> {
224 pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
226 match statement {
227 DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
228 DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
229 DFStatement::CopyTo(s) => self.copy_to_plan(s),
230 DFStatement::Explain(ExplainStatement {
231 verbose,
232 analyze,
233 format,
234 statement,
235 }) => self.explain_to_plan(verbose, analyze, format, *statement),
236 DFStatement::Reset(statement) => self.reset_statement_to_plan(statement),
237 }
238 }
239
240 pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
242 self.sql_statement_to_plan_with_context_impl(
243 statement,
244 &mut PlannerContext::new(),
245 )
246 }
247
248 pub fn sql_statement_to_plan_with_context(
250 &self,
251 statement: Statement,
252 planner_context: &mut PlannerContext,
253 ) -> Result<LogicalPlan> {
254 self.sql_statement_to_plan_with_context_impl(statement, planner_context)
255 }
256
257 fn sql_statement_to_plan_with_context_impl(
258 &self,
259 statement: Statement,
260 planner_context: &mut PlannerContext,
261 ) -> Result<LogicalPlan> {
262 match statement {
263 Statement::ExplainTable {
264 describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, table_name,
266 ..
267 } => self.describe_table_to_plan(table_name),
268 Statement::Explain {
269 describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, statement,
271 ..
272 } => match *statement {
273 Statement::Query(query) => self.describe_query_to_plan(*query),
274 _ => {
275 not_impl_err!("Describing statements other than SELECT not supported")
276 }
277 },
278 Statement::Explain {
279 verbose,
280 statement,
281 analyze,
282 format,
283 describe_alias: _,
284 ..
285 } => {
286 let format = format.map(|format| format.to_string());
287 let statement = DFStatement::Statement(statement);
288 self.explain_to_plan(verbose, analyze, format, statement)
289 }
290 Statement::Query(query) => self.query_to_plan(*query, planner_context),
291 Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
292 Statement::Set(statement) => self.set_statement_to_plan(statement),
293 Statement::CreateTable(CreateTable {
294 temporary,
295 external,
296 global,
297 transient,
298 volatile,
299 hive_distribution,
300 hive_formats,
301 file_format,
302 location,
303 query,
304 name,
305 columns,
306 constraints,
307 if_not_exists,
308 or_replace,
309 without_rowid,
310 like,
311 clone,
312 comment,
313 on_commit,
314 on_cluster,
315 primary_key,
316 order_by,
317 partition_by,
318 cluster_by,
319 clustered_by,
320 strict,
321 copy_grants,
322 enable_schema_evolution,
323 change_tracking,
324 data_retention_time_in_days,
325 max_data_extension_time_in_days,
326 default_ddl_collation,
327 with_aggregation_policy,
328 with_row_access_policy,
329 with_tags,
330 iceberg,
331 external_volume,
332 base_location,
333 catalog,
334 catalog_sync,
335 storage_serialization_policy,
336 inherits,
337 table_options: CreateTableOptions::None,
338 dynamic,
339 version,
340 target_lag,
341 warehouse,
342 refresh_mode,
343 initialize,
344 require_user,
345 partition_of,
346 for_values,
347 snapshot,
348 with_storage_lifecycle_policy,
349 diststyle,
350 distkey,
351 sortkey,
352 backup,
353 }) => {
354 if temporary {
355 return not_impl_err!("Temporary tables not supported");
356 }
357 if external {
358 return not_impl_err!("External tables not supported");
359 }
360 if global.is_some() {
361 return not_impl_err!("Global tables not supported");
362 }
363 if transient {
364 return not_impl_err!("Transient tables not supported");
365 }
366 if volatile {
367 return not_impl_err!("Volatile tables not supported");
368 }
369 if hive_distribution != ast::HiveDistributionStyle::NONE {
370 return not_impl_err!(
371 "Hive distribution not supported: {hive_distribution:?}"
372 );
373 }
374 if hive_formats.is_some()
375 && !matches!(
376 hive_formats,
377 Some(ast::HiveFormat {
378 row_format: None,
379 serde_properties: None,
380 storage: None,
381 location: None,
382 })
383 )
384 {
385 return not_impl_err!("Hive formats not supported: {hive_formats:?}");
386 }
387 if file_format.is_some() {
388 return not_impl_err!("File format not supported");
389 }
390 if location.is_some() {
391 return not_impl_err!("Location not supported");
392 }
393 if without_rowid {
394 return not_impl_err!("Without rowid not supported");
395 }
396 if like.is_some() {
397 return not_impl_err!("Like not supported");
398 }
399 if clone.is_some() {
400 return not_impl_err!("Clone not supported");
401 }
402 if comment.is_some() {
403 return not_impl_err!("Comment not supported");
404 }
405 if on_commit.is_some() {
406 return not_impl_err!("On commit not supported");
407 }
408 if on_cluster.is_some() {
409 return not_impl_err!("On cluster not supported");
410 }
411 if primary_key.is_some() {
412 return not_impl_err!("Primary key not supported");
413 }
414 if order_by.is_some() {
415 return not_impl_err!("Order by not supported");
416 }
417 if partition_by.is_some() {
418 return not_impl_err!("Partition by not supported");
419 }
420 if cluster_by.is_some() {
421 return not_impl_err!("Cluster by not supported");
422 }
423 if clustered_by.is_some() {
424 return not_impl_err!("Clustered by not supported");
425 }
426 if strict {
427 return not_impl_err!("Strict not supported");
428 }
429 if copy_grants {
430 return not_impl_err!("Copy grants not supported");
431 }
432 if enable_schema_evolution.is_some() {
433 return not_impl_err!("Enable schema evolution not supported");
434 }
435 if change_tracking.is_some() {
436 return not_impl_err!("Change tracking not supported");
437 }
438 if data_retention_time_in_days.is_some() {
439 return not_impl_err!("Data retention time in days not supported");
440 }
441 if max_data_extension_time_in_days.is_some() {
442 return not_impl_err!(
443 "Max data extension time in days not supported"
444 );
445 }
446 if default_ddl_collation.is_some() {
447 return not_impl_err!("Default DDL collation not supported");
448 }
449 if with_aggregation_policy.is_some() {
450 return not_impl_err!("With aggregation policy not supported");
451 }
452 if with_row_access_policy.is_some() {
453 return not_impl_err!("With row access policy not supported");
454 }
455 if with_tags.is_some() {
456 return not_impl_err!("With tags not supported");
457 }
458 if iceberg {
459 return not_impl_err!("Iceberg not supported");
460 }
461 if external_volume.is_some() {
462 return not_impl_err!("External volume not supported");
463 }
464 if base_location.is_some() {
465 return not_impl_err!("Base location not supported");
466 }
467 if catalog.is_some() {
468 return not_impl_err!("Catalog not supported");
469 }
470 if catalog_sync.is_some() {
471 return not_impl_err!("Catalog sync not supported");
472 }
473 if storage_serialization_policy.is_some() {
474 return not_impl_err!("Storage serialization policy not supported");
475 }
476 if inherits.is_some() {
477 return not_impl_err!("Table inheritance not supported");
478 }
479 if dynamic {
480 return not_impl_err!("Dynamic tables not supported");
481 }
482 if version.is_some() {
483 return not_impl_err!("Version not supported");
484 }
485 if target_lag.is_some() {
486 return not_impl_err!("Target lag not supported");
487 }
488 if warehouse.is_some() {
489 return not_impl_err!("Warehouse not supported");
490 }
491 if refresh_mode.is_some() {
492 return not_impl_err!("Refresh mode not supported");
493 }
494 if initialize.is_some() {
495 return not_impl_err!("Initialize not supported");
496 }
497 if require_user {
498 return not_impl_err!("Require user not supported");
499 }
500 if partition_of.is_some() {
501 return not_impl_err!("PARTITION OF not supported");
502 }
503 if for_values.is_some() {
504 return not_impl_err!("PARTITION OF .. FOR VALUES .. not supported");
505 }
506 if snapshot {
507 return not_impl_err!("Snapshot tables not supported");
508 }
509 if with_storage_lifecycle_policy.is_some() {
510 return not_impl_err!("WITH STORAGE LIFECYCLE POLICY not supported");
511 }
512 if diststyle.is_some() {
513 return not_impl_err!("DISTSTYLE not supported");
514 }
515 if distkey.is_some() {
516 return not_impl_err!("DISTKEY not supported");
517 }
518 if sortkey.is_some() {
519 return not_impl_err!("SORTKEY not supported");
520 }
521 if backup.is_some() {
522 return not_impl_err!("BACKUP not supported");
523 }
524 let mut all_constraints = constraints;
526 let inline_constraints = calc_inline_constraints_from_columns(&columns);
527 all_constraints.extend(inline_constraints);
528 let column_defaults =
530 self.build_column_defaults(&columns, planner_context)?;
531
532 let has_columns = !columns.is_empty();
533 let schema = self.build_schema(columns)?.to_dfschema_ref()?;
534 if has_columns {
535 planner_context.set_table_schema(Some(Arc::clone(&schema)));
536 }
537
538 match query {
539 Some(query) => {
540 let plan = self.query_to_plan(*query, planner_context)?;
541 let input_schema = plan.schema();
542
543 let plan = if has_columns {
544 if schema.fields().len() != input_schema.fields().len() {
545 return plan_err!(
546 "Mismatch: {} columns specified, but result has {} columns",
547 schema.fields().len(),
548 input_schema.fields().len()
549 );
550 }
551 let input_fields = input_schema.fields();
552 let project_exprs = schema
553 .fields()
554 .iter()
555 .zip(input_fields)
556 .map(|(field, input_field)| {
557 cast(
558 col(input_field.name()),
559 field.data_type().clone(),
560 )
561 .alias(field.name())
562 })
563 .collect::<Vec<_>>();
564
565 LogicalPlanBuilder::from(plan.clone())
566 .project(project_exprs)?
567 .build()?
568 } else {
569 plan
570 };
571
572 let constraints = self.new_constraint_from_table_constraints(
573 &all_constraints,
574 plan.schema(),
575 )?;
576
577 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
578 CreateMemoryTable {
579 name: self.object_name_to_table_reference(name)?,
580 constraints,
581 input: Arc::new(plan),
582 if_not_exists,
583 or_replace,
584 column_defaults,
585 temporary,
586 },
587 )))
588 }
589
590 None => {
591 let plan = EmptyRelation {
592 produce_one_row: false,
593 schema,
594 };
595 let plan = LogicalPlan::EmptyRelation(plan);
596 let constraints = self.new_constraint_from_table_constraints(
597 &all_constraints,
598 plan.schema(),
599 )?;
600 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
601 CreateMemoryTable {
602 name: self.object_name_to_table_reference(name)?,
603 constraints,
604 input: Arc::new(plan),
605 if_not_exists,
606 or_replace,
607 column_defaults,
608 temporary,
609 },
610 )))
611 }
612 }
613 }
614 Statement::CreateView(ast::CreateView {
615 or_replace,
616 materialized,
617 name,
618 columns,
619 query,
620 options: CreateTableOptions::None,
621 cluster_by,
622 comment,
623 with_no_schema_binding,
624 if_not_exists,
625 temporary,
626 to,
627 params,
628 or_alter,
629 secure,
630 name_before_not_exists,
631 copy_grants,
632 }) => {
633 if materialized {
634 return not_impl_err!("Materialized views not supported")?;
635 }
636 if !cluster_by.is_empty() {
637 return not_impl_err!("Cluster by not supported")?;
638 }
639 if comment.is_some() {
640 return not_impl_err!("Comment not supported")?;
641 }
642 if with_no_schema_binding {
643 return not_impl_err!("With no schema binding not supported")?;
644 }
645 if if_not_exists {
646 return not_impl_err!("If not exists not supported")?;
647 }
648 if to.is_some() {
649 return not_impl_err!("To not supported")?;
650 }
651 if copy_grants {
652 return not_impl_err!("COPY GRANTS not supported")?;
653 }
654
655 let stmt = Statement::CreateView(ast::CreateView {
658 or_replace,
659 materialized,
660 name,
661 columns,
662 query,
663 options: CreateTableOptions::None,
664 cluster_by,
665 comment,
666 with_no_schema_binding,
667 if_not_exists,
668 temporary,
669 to,
670 params,
671 or_alter,
672 secure,
673 name_before_not_exists,
674 copy_grants,
675 });
676 let sql = stmt.to_string();
677 let Statement::CreateView(ast::CreateView {
678 name,
679 columns,
680 query,
681 or_replace,
682 temporary,
683 ..
684 }) = stmt
685 else {
686 return internal_err!("Unreachable code in create view");
687 };
688
689 let columns = columns
690 .into_iter()
691 .map(|view_column_def| {
692 if let Some(options) = view_column_def.options {
693 plan_err!(
694 "Options not supported for view columns: {options:?}"
695 )
696 } else {
697 Ok(view_column_def.name)
698 }
699 })
700 .collect::<Result<Vec<_>>>()?;
701
702 let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
703 plan = self.apply_expr_alias(plan, columns)?;
704
705 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
706 name: self.object_name_to_table_reference(name)?,
707 input: Arc::new(plan),
708 or_replace,
709 definition: Some(sql),
710 temporary,
711 })))
712 }
713 Statement::ShowCreate { obj_type, obj_name } => match obj_type {
714 ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
715 _ => {
716 not_impl_err!("Only `SHOW CREATE TABLE ...` statement is supported")
717 }
718 },
719 Statement::CreateSchema {
720 schema_name,
721 if_not_exists,
722 ..
723 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
724 CreateCatalogSchema {
725 schema_name: get_schema_name(&schema_name),
726 if_not_exists,
727 schema: Arc::new(DFSchema::empty()),
728 },
729 ))),
730 Statement::CreateDatabase {
731 db_name,
732 if_not_exists,
733 ..
734 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
735 CreateCatalog {
736 catalog_name: object_name_to_string(&db_name),
737 if_not_exists,
738 schema: Arc::new(DFSchema::empty()),
739 },
740 ))),
741 Statement::Drop {
742 object_type,
743 if_exists,
744 mut names,
745 cascade,
746 restrict: _,
747 purge: _,
748 temporary: _,
749 table: _,
750 } => {
751 let name = match names.len() {
754 0 => Err(ParserError("Missing table name.".to_string()).into()),
755 1 => self.object_name_to_table_reference(names.pop().unwrap()),
756 _ => {
757 Err(ParserError("Multiple objects not supported".to_string())
758 .into())
759 }
760 }?;
761
762 match object_type {
763 ObjectType::Table => {
764 Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
765 name,
766 if_exists,
767 schema: DFSchemaRef::new(DFSchema::empty()),
768 })))
769 }
770 ObjectType::View => {
771 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
772 name,
773 if_exists,
774 schema: DFSchemaRef::new(DFSchema::empty()),
775 })))
776 }
777 ObjectType::Schema => {
778 let name = match name {
779 TableReference::Bare { table } => {
780 Ok(SchemaReference::Bare { schema: table })
781 }
782 TableReference::Partial { schema, table } => {
783 Ok(SchemaReference::Full {
784 schema: table,
785 catalog: schema,
786 })
787 }
788 TableReference::Full {
789 catalog: _,
790 schema: _,
791 table: _,
792 } => Err(ParserError(
793 "Invalid schema specifier (has 3 parts)".to_string(),
794 )),
795 }?;
796 Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(
797 DropCatalogSchema {
798 name,
799 if_exists,
800 cascade,
801 schema: DFSchemaRef::new(DFSchema::empty()),
802 },
803 )))
804 }
805 _ => not_impl_err!(
806 "Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
807 ),
808 }
809 }
810 Statement::Prepare {
811 name,
812 data_types,
813 statement,
814 } => {
815 let mut fields: Vec<FieldRef> = data_types
817 .into_iter()
818 .map(|t| self.convert_data_type_to_field(&t))
819 .collect::<Result<_>>()?;
820
821 let mut planner_context = PlannerContext::new()
823 .with_prepare_param_data_types(
824 fields.iter().cloned().map(Some).collect(),
825 );
826
827 let plan = self.sql_statement_to_plan_with_context_impl(
829 *statement,
830 &mut planner_context,
831 )?;
832
833 if fields.is_empty() {
834 let map_types = plan.get_parameter_fields()?;
835 let param_types: Vec<_> = (1..=map_types.len())
836 .filter_map(|i| {
837 let key = format!("${i}");
838 map_types.get(&key).and_then(|opt| opt.clone())
839 })
840 .collect();
841 fields.extend(param_types.iter().cloned());
842 planner_context.with_prepare_param_data_types(
843 param_types.into_iter().map(Some).collect(),
844 );
845 }
846
847 Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
848 name: ident_to_string(&name),
849 fields,
850 input: Arc::new(plan),
851 })))
852 }
853 Statement::Execute {
854 name,
855 parameters,
856 using,
857 has_parentheses: _,
860 immediate,
861 into,
862 output,
863 default,
864 } => {
865 if !using.is_empty() {
867 return not_impl_err!(
868 "Execute statement with USING is not supported"
869 );
870 }
871 if immediate {
872 return not_impl_err!(
873 "Execute statement with IMMEDIATE is not supported"
874 );
875 }
876 if !into.is_empty() {
877 return not_impl_err!("Execute statement with INTO is not supported");
878 }
879 if output {
880 return not_impl_err!(
881 "Execute statement with OUTPUT is not supported"
882 );
883 }
884 if default {
885 return not_impl_err!(
886 "Execute statement with DEFAULT is not supported"
887 );
888 }
889 let name = name.ok_or_else(|| {
890 plan_datafusion_err!("EXECUTE statement requires a name")
891 })?;
892
893 let empty_schema = DFSchema::empty();
894 let parameters = parameters
895 .into_iter()
896 .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
897 .collect::<Result<Vec<Expr>>>()?;
898
899 Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
900 name: object_name_to_string(&name),
901 parameters,
902 })))
903 }
904 Statement::Deallocate {
905 name,
906 prepare: _,
908 } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
909 Deallocate {
910 name: ident_to_string(&name),
911 },
912 ))),
913
914 Statement::ShowTables {
915 extended,
916 full,
917 terse,
918 history,
919 external,
920 show_options,
921 } => {
922 if extended {
925 return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
926 }
927 if full {
928 return not_impl_err!("SHOW FULL TABLES not supported")?;
929 }
930 if terse {
931 return not_impl_err!("SHOW TERSE TABLES not supported")?;
932 }
933 if history {
934 return not_impl_err!("SHOW TABLES HISTORY not supported")?;
935 }
936 if external {
937 return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
938 }
939 let ShowStatementOptions {
940 show_in,
941 starts_with,
942 limit,
943 limit_from,
944 filter_position,
945 } = show_options;
946 if show_in.is_some() {
947 return not_impl_err!("SHOW TABLES IN not supported")?;
948 }
949 if starts_with.is_some() {
950 return not_impl_err!("SHOW TABLES LIKE not supported")?;
951 }
952 if limit.is_some() {
953 return not_impl_err!("SHOW TABLES LIMIT not supported")?;
954 }
955 if limit_from.is_some() {
956 return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
957 }
958 if filter_position.is_some() {
959 return not_impl_err!("SHOW TABLES FILTER not supported")?;
960 }
961 self.show_tables_to_plan()
962 }
963
964 Statement::ShowColumns {
965 extended,
966 full,
967 show_options,
968 } => {
969 let ShowStatementOptions {
970 show_in,
971 starts_with,
972 limit,
973 limit_from,
974 filter_position,
975 } = show_options;
976 if starts_with.is_some() {
977 return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
978 }
979 if limit.is_some() {
980 return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
981 }
982 if limit_from.is_some() {
983 return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
984 }
985 if filter_position.is_some() {
986 return not_impl_err!(
987 "SHOW COLUMNS with WHERE or LIKE is not supported"
988 )?;
989 }
990 let Some(ShowStatementIn {
991 clause: _,
994 parent_type,
995 parent_name,
996 }) = show_in
997 else {
998 return plan_err!("SHOW COLUMNS requires a table name");
999 };
1000
1001 if let Some(parent_type) = parent_type {
1002 return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
1003 }
1004 let Some(table_name) = parent_name else {
1005 return plan_err!("SHOW COLUMNS requires a table name");
1006 };
1007
1008 self.show_columns_to_plan(extended, full, table_name)
1009 }
1010
1011 Statement::ShowFunctions { filter, .. } => {
1012 self.show_functions_to_plan(filter)
1013 }
1014
1015 Statement::Insert(Insert {
1016 or,
1017 into,
1018 columns,
1019 overwrite,
1020 source,
1021 partitioned,
1022 after_columns,
1023 table,
1024 on,
1025 returning,
1026 ignore,
1027 table_alias,
1028 mut replace_into,
1029 priority,
1030 insert_alias,
1031 assignments,
1032 has_table_keyword,
1033 settings,
1034 format_clause,
1035 insert_token: _, optimizer_hints,
1037 output,
1038 multi_table_insert_type,
1039 multi_table_into_clauses,
1040 multi_table_when_clauses,
1041 multi_table_else_clause,
1042 }) => {
1043 let table_name = match table {
1044 TableObject::TableName(table_name) => table_name,
1045 TableObject::TableFunction(_) => {
1046 return not_impl_err!(
1047 "INSERT INTO Table functions not supported"
1048 );
1049 }
1050 TableObject::TableQuery(_) => {
1051 return not_impl_err!(
1052 "INSERT INTO subquery target not supported"
1053 );
1054 }
1055 };
1056 if let Some(or) = or {
1057 match or {
1058 SqliteOnConflict::Replace => replace_into = true,
1059 _ => plan_err!("Inserts with {or} clause is not supported")?,
1060 }
1061 }
1062 if partitioned.is_some() {
1063 plan_err!("Partitioned inserts not yet supported")?;
1064 }
1065 if !after_columns.is_empty() {
1066 plan_err!("After-columns clause not supported")?;
1067 }
1068 if on.is_some() {
1069 plan_err!("Insert-on clause not supported")?;
1070 }
1071 if returning.is_some() {
1072 plan_err!("Insert-returning clause not supported")?;
1073 }
1074 if ignore {
1075 plan_err!("Insert-ignore clause not supported")?;
1076 }
1077 let Some(source) = source else {
1078 plan_err!("Inserts without a source not supported")?
1079 };
1080 if let Some(table_alias) = table_alias {
1081 plan_err!(
1082 "Inserts with a table alias not supported: {table_alias:?}"
1083 )?
1084 };
1085 if let Some(priority) = priority {
1086 plan_err!(
1087 "Inserts with a `PRIORITY` clause not supported: {priority:?}"
1088 )?
1089 };
1090 if insert_alias.is_some() {
1091 plan_err!("Inserts with an alias not supported")?;
1092 }
1093 if !assignments.is_empty() {
1094 plan_err!("Inserts with assignments not supported")?;
1095 }
1096 if settings.is_some() {
1097 plan_err!("Inserts with settings not supported")?;
1098 }
1099 if format_clause.is_some() {
1100 plan_err!("Inserts with format clause not supported")?;
1101 }
1102 if !optimizer_hints.is_empty() {
1103 plan_err!("Optimizer hints not supported")?;
1104 }
1105 if output.is_some() {
1106 plan_err!("Insert OUTPUT clause not supported")?;
1107 }
1108 if multi_table_insert_type.is_some()
1109 || !multi_table_into_clauses.is_empty()
1110 || !multi_table_when_clauses.is_empty()
1111 || multi_table_else_clause.is_some()
1112 {
1113 plan_err!("Multi-table INSERT not supported")?;
1114 }
1115 let _ = into;
1117 let _ = has_table_keyword;
1118 self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
1119 }
1120 Statement::Update(Update {
1121 table,
1122 assignments,
1123 from,
1124 selection,
1125 returning,
1126 or,
1127 limit,
1128 update_token: _,
1129 optimizer_hints,
1130 output,
1131 order_by,
1132 }) => {
1133 let from_clauses =
1134 from.map(|update_table_from_kind| match update_table_from_kind {
1135 UpdateTableFromKind::BeforeSet(from_clauses) => from_clauses,
1136 UpdateTableFromKind::AfterSet(from_clauses) => from_clauses,
1137 });
1138 if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
1140 not_impl_err!(
1141 "Multiple tables in UPDATE SET FROM not yet supported"
1142 )?;
1143 }
1144 let update_from = from_clauses.and_then(|mut f| f.pop());
1145
1146 if update_from.is_some() {
1149 return not_impl_err!("UPDATE ... FROM is not supported");
1150 }
1151
1152 if returning.is_some() {
1153 plan_err!("Update-returning clause not yet supported")?;
1154 }
1155 if or.is_some() {
1156 plan_err!("ON conflict not supported")?;
1157 }
1158 if limit.is_some() {
1159 return not_impl_err!("Update-limit clause not supported")?;
1160 }
1161 if !optimizer_hints.is_empty() {
1162 plan_err!("Optimizer hints not supported")?;
1163 }
1164 if output.is_some() {
1165 plan_err!("Update OUTPUT clause not supported")?;
1166 }
1167 if !order_by.is_empty() {
1168 plan_err!("Update ORDER BY not supported")?;
1169 }
1170 self.update_to_plan(table, &assignments, update_from, selection)
1171 }
1172
1173 Statement::Delete(Delete {
1174 tables,
1175 using,
1176 selection,
1177 returning,
1178 from,
1179 order_by,
1180 limit,
1181 delete_token: _,
1182 optimizer_hints,
1183 output,
1184 }) => {
1185 if !tables.is_empty() {
1186 plan_err!("DELETE <TABLE> not supported")?;
1187 }
1188
1189 if using.is_some() {
1190 plan_err!("Using clause not supported")?;
1191 }
1192
1193 if returning.is_some() {
1194 plan_err!("Delete-returning clause not yet supported")?;
1195 }
1196
1197 if !order_by.is_empty() {
1198 plan_err!("Delete-order-by clause not yet supported")?;
1199 }
1200
1201 if !optimizer_hints.is_empty() {
1202 plan_err!("Optimizer hints not supported")?;
1203 }
1204 if output.is_some() {
1205 plan_err!("Delete OUTPUT clause not supported")?;
1206 }
1207
1208 let table_name = self.get_delete_target(from)?;
1209 self.delete_to_plan(&table_name, selection, limit)
1210 }
1211
1212 Statement::StartTransaction {
1213 modes,
1214 begin: false,
1215 modifier,
1216 transaction,
1217 statements,
1218 has_end_keyword,
1219 exception,
1220 } => {
1221 if let Some(modifier) = modifier {
1222 return not_impl_err!(
1223 "Transaction modifier not supported: {modifier}"
1224 );
1225 }
1226 if !statements.is_empty() {
1227 return not_impl_err!(
1228 "Transaction with multiple statements not supported"
1229 );
1230 }
1231 if exception.is_some() {
1232 return not_impl_err!(
1233 "Transaction with exception statements not supported"
1234 );
1235 }
1236 if has_end_keyword {
1237 return not_impl_err!("Transaction with END keyword not supported");
1238 }
1239 self.validate_transaction_kind(transaction.as_ref())?;
1240 let isolation_level: ast::TransactionIsolationLevel = modes
1241 .iter()
1242 .filter_map(|m: &TransactionMode| match m {
1243 TransactionMode::AccessMode(_) => None,
1244 TransactionMode::IsolationLevel(level) => Some(level),
1245 })
1246 .next_back()
1247 .copied()
1248 .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1249 let access_mode: ast::TransactionAccessMode = modes
1250 .iter()
1251 .filter_map(|m: &TransactionMode| match m {
1252 TransactionMode::AccessMode(mode) => Some(mode),
1253 TransactionMode::IsolationLevel(_) => None,
1254 })
1255 .next_back()
1256 .copied()
1257 .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1258 let isolation_level = match isolation_level {
1259 ast::TransactionIsolationLevel::ReadUncommitted => {
1260 TransactionIsolationLevel::ReadUncommitted
1261 }
1262 ast::TransactionIsolationLevel::ReadCommitted => {
1263 TransactionIsolationLevel::ReadCommitted
1264 }
1265 ast::TransactionIsolationLevel::RepeatableRead => {
1266 TransactionIsolationLevel::RepeatableRead
1267 }
1268 ast::TransactionIsolationLevel::Serializable => {
1269 TransactionIsolationLevel::Serializable
1270 }
1271 ast::TransactionIsolationLevel::Snapshot => {
1272 TransactionIsolationLevel::Snapshot
1273 }
1274 };
1275 let access_mode = match access_mode {
1276 ast::TransactionAccessMode::ReadOnly => {
1277 TransactionAccessMode::ReadOnly
1278 }
1279 ast::TransactionAccessMode::ReadWrite => {
1280 TransactionAccessMode::ReadWrite
1281 }
1282 };
1283 let statement = PlanStatement::TransactionStart(TransactionStart {
1284 access_mode,
1285 isolation_level,
1286 });
1287 Ok(LogicalPlan::Statement(statement))
1288 }
1289 Statement::Commit {
1290 chain,
1291 end,
1292 modifier,
1293 } => {
1294 if end {
1295 return not_impl_err!("COMMIT AND END not supported");
1296 };
1297 if let Some(modifier) = modifier {
1298 return not_impl_err!("COMMIT {modifier} not supported");
1299 };
1300 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1301 conclusion: TransactionConclusion::Commit,
1302 chain,
1303 });
1304 Ok(LogicalPlan::Statement(statement))
1305 }
1306 Statement::Rollback { chain, savepoint } => {
1307 if savepoint.is_some() {
1308 plan_err!("Savepoints not supported")?;
1309 }
1310 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1311 conclusion: TransactionConclusion::Rollback,
1312 chain,
1313 });
1314 Ok(LogicalPlan::Statement(statement))
1315 }
1316 Statement::CreateFunction(ast::CreateFunction {
1317 or_replace,
1318 temporary,
1319 name,
1320 args,
1321 return_type,
1322 function_body,
1323 behavior,
1324 language,
1325 ..
1326 }) => {
1327 let return_type = match return_type {
1328 Some(ast::FunctionReturnType::DataType(t)) => {
1329 Some(self.convert_data_type_to_field(&t)?)
1330 }
1331 Some(ast::FunctionReturnType::SetOf(_)) => {
1332 return not_impl_err!(
1333 "RETURNS SETOF in CREATE FUNCTION is not supported"
1334 );
1335 }
1336 None => None,
1337 };
1338 let mut planner_context = PlannerContext::new();
1339 let empty_schema = &DFSchema::empty();
1340
1341 let args = match args {
1342 Some(function_args) => {
1343 let function_args = function_args
1344 .into_iter()
1345 .map(|arg| {
1346 let data_type =
1347 self.convert_data_type_to_field(&arg.data_type)?;
1348
1349 let default_expr = match arg.default_expr {
1350 Some(expr) => Some(self.sql_to_expr(
1351 expr,
1352 empty_schema,
1353 &mut planner_context,
1354 )?),
1355 None => None,
1356 };
1357 Ok(OperateFunctionArg {
1358 name: arg.name,
1359 default_expr,
1360 data_type: data_type.data_type().clone(),
1361 })
1362 })
1363 .collect::<Result<Vec<OperateFunctionArg>>>();
1364 Some(function_args?)
1365 }
1366 None => None,
1367 };
1368 let first_default = match args.as_ref() {
1370 Some(arg) => arg.iter().position(|t| t.default_expr.is_some()),
1371 None => None,
1372 };
1373 let last_non_default = match args.as_ref() {
1374 Some(arg) => arg
1375 .iter()
1376 .rev()
1377 .position(|t| t.default_expr.is_none())
1378 .map(|reverse_pos| arg.len() - reverse_pos - 1),
1379 None => None,
1380 };
1381 if let (Some(pos_default), Some(pos_non_default)) =
1382 (first_default, last_non_default)
1383 && pos_non_default > pos_default
1384 {
1385 return plan_err!(
1386 "Non-default arguments cannot follow default arguments."
1387 );
1388 }
1389 let name = match &name.0[..] {
1391 [] => exec_err!("Function should have name")?,
1392 [n] => n.as_ident().unwrap().value.clone(),
1393 [..] => not_impl_err!("Qualified functions are not supported")?,
1394 };
1395 let arg_types = args.as_ref().map(|arg| {
1399 arg.iter()
1400 .map(|t| {
1401 let name = match t.name.clone() {
1402 Some(name) => name.value,
1403 None => "".to_string(),
1404 };
1405 Arc::new(Field::new(name, t.data_type.clone(), true))
1406 })
1407 .collect::<Vec<_>>()
1408 });
1409 if let Some(ref fields) = arg_types {
1411 let count_positional =
1412 fields.iter().filter(|f| f.name() == "").count();
1413 if !(count_positional == 0 || count_positional == fields.len()) {
1414 return plan_err!(
1415 "All function arguments must use either named or positional style."
1416 );
1417 }
1418 }
1419 let mut planner_context = PlannerContext::new()
1420 .with_prepare_param_data_types(
1421 arg_types
1422 .unwrap_or_default()
1423 .into_iter()
1424 .map(Some)
1425 .collect(),
1426 );
1427
1428 let function_body = match function_body {
1429 Some(r) => Some(self.sql_to_expr(
1430 match r {
1431 ast::CreateFunctionBody::AsBeforeOptions{body: expr, link_symbol: _link_symbol} => expr,
1433 ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1434 ast::CreateFunctionBody::Return(expr) => expr,
1435 ast::CreateFunctionBody::AsBeginEnd(_) => {
1436 return not_impl_err!(
1437 "BEGIN/END enclosed function body syntax is not supported"
1438 )?;
1439 }
1440 ast::CreateFunctionBody::AsReturnExpr(_)
1441 | ast::CreateFunctionBody::AsReturnSelect(_) => {
1442 return not_impl_err!(
1443 "AS RETURN function syntax is not supported"
1444 )?
1445 }
1446 },
1447 &DFSchema::empty(),
1448 &mut planner_context,
1449 )?),
1450 None => None,
1451 };
1452
1453 let params = CreateFunctionBody {
1454 language,
1455 behavior: behavior.map(|b| match b {
1456 ast::FunctionBehavior::Immutable => Volatility::Immutable,
1457 ast::FunctionBehavior::Stable => Volatility::Stable,
1458 ast::FunctionBehavior::Volatile => Volatility::Volatile,
1459 }),
1460 function_body,
1461 };
1462
1463 let statement = DdlStatement::CreateFunction(CreateFunction {
1464 or_replace,
1465 temporary,
1466 name,
1467 return_type: return_type.map(|f| f.data_type().clone()),
1468 args,
1469 params,
1470 schema: DFSchemaRef::new(DFSchema::empty()),
1471 });
1472
1473 Ok(LogicalPlan::Ddl(statement))
1474 }
1475 Statement::DropFunction(ast::DropFunction {
1476 if_exists,
1477 func_desc,
1478 drop_behavior: _,
1479 }) => {
1480 if let Some(desc) = func_desc.first() {
1483 let name = match &desc.name.0[..] {
1485 [] => exec_err!("Function should have name")?,
1486 [n] => n.as_ident().unwrap().value.clone(),
1487 [..] => not_impl_err!("Qualified functions are not supported")?,
1488 };
1489 let statement = DdlStatement::DropFunction(DropFunction {
1490 if_exists,
1491 name,
1492 schema: DFSchemaRef::new(DFSchema::empty()),
1493 });
1494 Ok(LogicalPlan::Ddl(statement))
1495 } else {
1496 exec_err!("Function name not provided")
1497 }
1498 }
1499 Statement::Truncate(ast::Truncate {
1500 table_names,
1501 partitions,
1502 identity,
1503 cascade,
1504 on_cluster,
1505 table,
1506 if_exists,
1507 }) => {
1508 let _ = table; if table_names.len() != 1 {
1510 return not_impl_err!(
1511 "TRUNCATE with multiple tables is not supported"
1512 );
1513 }
1514
1515 let target = &table_names[0];
1516 if target.only {
1517 return not_impl_err!("TRUNCATE with ONLY is not supported");
1518 }
1519 if partitions.is_some() {
1520 return not_impl_err!("TRUNCATE with PARTITION is not supported");
1521 }
1522 if identity.is_some() {
1523 return not_impl_err!(
1524 "TRUNCATE with RESTART/CONTINUE IDENTITY is not supported"
1525 );
1526 }
1527 if cascade.is_some() {
1528 return not_impl_err!(
1529 "TRUNCATE with CASCADE/RESTRICT is not supported"
1530 );
1531 }
1532 if on_cluster.is_some() {
1533 return not_impl_err!("TRUNCATE with ON CLUSTER is not supported");
1534 }
1535 if if_exists {
1536 return not_impl_err!("TRUNCATE .. with IF EXISTS is not supported");
1537 }
1538 let table = self.object_name_to_table_reference(target.name.clone())?;
1539 let source = self.context_provider.get_table_source(table.clone())?;
1540
1541 Ok(LogicalPlan::Dml(DmlStatement::new(
1544 table.clone(),
1545 source,
1546 WriteOp::Truncate,
1547 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
1548 produce_one_row: false,
1549 schema: DFSchemaRef::new(DFSchema::empty()),
1550 })),
1551 )))
1552 }
1553 Statement::CreateIndex(CreateIndex {
1554 name,
1555 table_name,
1556 using,
1557 columns,
1558 unique,
1559 if_not_exists,
1560 ..
1561 }) => {
1562 let name: Option<String> = name.as_ref().map(object_name_to_string);
1563 let table = self.object_name_to_table_reference(table_name)?;
1564 let table_schema = self
1565 .context_provider
1566 .get_table_source(table.clone())?
1567 .schema()
1568 .to_dfschema_ref()?;
1569 let using: Option<String> =
1570 using.as_ref().map(|index_type| match index_type {
1571 IndexType::Custom(ident) => ident_to_string(ident),
1572 _ => index_type.to_string().to_ascii_lowercase(),
1573 });
1574 let order_by_exprs: Vec<OrderByExpr> =
1575 columns.into_iter().map(|col| col.column).collect();
1576 let columns = self.order_by_to_sort_expr(
1577 order_by_exprs,
1578 &table_schema,
1579 planner_context,
1580 false,
1581 None,
1582 )?;
1583 Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1584 PlanCreateIndex {
1585 name,
1586 table,
1587 using,
1588 columns,
1589 unique,
1590 if_not_exists,
1591 schema: DFSchemaRef::new(DFSchema::empty()),
1592 },
1593 )))
1594 }
1595 stmt => {
1596 not_impl_err!("Unsupported SQL statement: {stmt}")
1597 }
1598 }
1599 }
1600
1601 fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1602 let mut from = match from {
1603 FromTable::WithFromKeyword(v) => v,
1604 FromTable::WithoutKeyword(v) => v,
1605 };
1606
1607 if from.len() != 1 {
1608 return not_impl_err!(
1609 "DELETE FROM only supports single table, got {}: {from:?}",
1610 from.len()
1611 );
1612 }
1613 let table_factor = from.pop().unwrap();
1614 if !table_factor.joins.is_empty() {
1615 return not_impl_err!("DELETE FROM only supports single table, got: joins");
1616 }
1617 let TableFactor::Table { name, .. } = table_factor.relation else {
1618 return not_impl_err!(
1619 "DELETE FROM only supports single table, got: {table_factor:?}"
1620 );
1621 };
1622
1623 Ok(name)
1624 }
1625
1626 fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1628 if self.has_table("information_schema", "tables") {
1629 let query = "SELECT * FROM information_schema.tables;";
1630 let mut rewrite = DFParser::parse_sql(query)?;
1631 assert_eq!(rewrite.len(), 1);
1632 self.statement_to_plan(rewrite.pop_front().unwrap()) } else {
1634 plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1635 }
1636 }
1637
1638 fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1639 let table_ref = self.object_name_to_table_reference(table_name)?;
1640
1641 let table_source = self.context_provider.get_table_source(table_ref)?;
1642
1643 let schema = table_source.schema();
1644
1645 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1646
1647 Ok(LogicalPlan::DescribeTable(DescribeTable {
1648 schema,
1649 output_schema: Arc::new(output_schema),
1650 }))
1651 }
1652
1653 fn describe_query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
1654 let plan = self.query_to_plan(query, &mut PlannerContext::new())?;
1655
1656 let schema = Arc::new(plan.schema().as_arrow().clone());
1657
1658 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1659
1660 Ok(LogicalPlan::DescribeTable(DescribeTable {
1661 schema,
1662 output_schema: Arc::new(output_schema),
1663 }))
1664 }
1665
1666 fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1667 let copy_source = statement.source;
1669 let (input, input_schema, table_ref) = match copy_source {
1670 CopyToSource::Relation(object_name) => {
1671 let table_name = object_name_to_string(&object_name);
1672 let table_ref = self.object_name_to_table_reference(object_name)?;
1673 let table_source =
1674 self.context_provider.get_table_source(table_ref.clone())?;
1675 let plan =
1676 LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1677 let input_schema = Arc::clone(plan.schema());
1678 (plan, input_schema, Some(table_ref))
1679 }
1680 CopyToSource::Query(query) => {
1681 let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1682 let input_schema = Arc::clone(plan.schema());
1683 (plan, input_schema, None)
1684 }
1685 };
1686
1687 let options_map = self.parse_options_map(statement.options, true)?;
1688
1689 let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1690 self.context_provider.get_file_type(stored_as).ok()
1691 } else {
1692 None
1693 };
1694
1695 let file_type = match maybe_file_type {
1696 Some(ft) => ft,
1697 None => {
1698 let e = || {
1699 DataFusionError::Configuration(
1700 "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1701 .to_string(),
1702 )
1703 };
1704 let extension: &str = &Path::new(&statement.target)
1706 .extension()
1707 .ok_or_else(e)?
1708 .to_str()
1709 .ok_or_else(e)?
1710 .to_lowercase();
1711
1712 self.context_provider.get_file_type(extension)?
1713 }
1714 };
1715
1716 let partition_by = statement
1717 .partitioned_by
1718 .iter()
1719 .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1720 .collect::<Result<Vec<_>>>()?
1721 .into_iter()
1722 .map(|f| f.name().to_owned())
1723 .collect();
1724
1725 Ok(LogicalPlan::Copy(CopyTo::new(
1726 Arc::new(input),
1727 statement.target,
1728 partition_by,
1729 file_type,
1730 options_map,
1731 )))
1732 }
1733
1734 fn build_order_by(
1735 &self,
1736 order_exprs: Vec<LexOrdering>,
1737 schema: &DFSchemaRef,
1738 planner_context: &mut PlannerContext,
1739 ) -> Result<Vec<Vec<SortExpr>>> {
1740 if !order_exprs.is_empty() && schema.fields().is_empty() {
1741 let results = order_exprs
1742 .iter()
1743 .map(|lex_order| {
1744 let result = lex_order
1745 .iter()
1746 .map(|order_by_expr| {
1747 let ordered_expr = &order_by_expr.expr;
1748 let ordered_expr = ordered_expr.to_owned();
1749 let ordered_expr = self.sql_expr_to_logical_expr(
1750 ordered_expr,
1751 schema,
1752 planner_context,
1753 )?;
1754 let asc = order_by_expr.options.asc.unwrap_or(true);
1755 let nulls_first =
1756 order_by_expr.options.nulls_first.unwrap_or_else(|| {
1757 self.options.default_null_ordering.nulls_first(asc)
1758 });
1759
1760 Ok(SortExpr::new(ordered_expr, asc, nulls_first))
1761 })
1762 .collect::<Result<Vec<SortExpr>>>()?;
1763 Ok(result)
1764 })
1765 .collect::<Result<Vec<Vec<SortExpr>>>>()?;
1766
1767 return Ok(results);
1768 }
1769
1770 let mut all_results = vec![];
1771 for expr in order_exprs {
1772 let expr_vec =
1774 self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1775 for sort in expr_vec.iter() {
1777 for column in sort.expr.column_refs().iter() {
1778 if !schema.has_column(column) {
1779 return plan_err!("Column {column} is not in schema");
1781 }
1782 }
1783 }
1784 all_results.push(expr_vec)
1786 }
1787 Ok(all_results)
1788 }
1789
1790 fn external_table_to_plan(
1792 &self,
1793 statement: CreateExternalTable,
1794 ) -> Result<LogicalPlan> {
1795 let definition = Some(statement.to_string());
1796 let CreateExternalTable {
1797 name,
1798 columns,
1799 file_type,
1800 location,
1801 table_partition_cols,
1802 if_not_exists,
1803 temporary,
1804 order_exprs,
1805 unbounded,
1806 options,
1807 constraints,
1808 or_replace,
1809 } = statement;
1810
1811 let mut all_constraints = constraints;
1813 let inline_constraints = calc_inline_constraints_from_columns(&columns);
1814 all_constraints.extend(inline_constraints);
1815
1816 let options_map = self.parse_options_map(options, false)?;
1817
1818 let compression = options_map
1819 .get("format.compression")
1820 .map(|c| CompressionTypeVariant::from_str(c))
1821 .transpose()?;
1822 if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1823 && compression
1824 .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1825 .unwrap_or(false)
1826 {
1827 plan_err!(
1828 "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1829 )?;
1830 }
1831
1832 let mut planner_context = PlannerContext::new();
1833
1834 let column_defaults = self
1835 .build_column_defaults(&columns, &mut planner_context)?
1836 .into_iter()
1837 .collect();
1838
1839 let schema = self.build_schema(columns)?;
1840 let df_schema = schema.to_dfschema_ref()?;
1841 df_schema.check_names()?;
1842
1843 let ordered_exprs =
1844 self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1845
1846 let name = self.object_name_to_table_reference(name)?;
1847 let constraints =
1848 self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1849 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1850 PlanCreateExternalTable::builder(name, location, file_type, df_schema)
1851 .with_partition_cols(table_partition_cols)
1852 .with_if_not_exists(if_not_exists)
1853 .with_or_replace(or_replace)
1854 .with_temporary(temporary)
1855 .with_definition(definition)
1856 .with_order_exprs(ordered_exprs)
1857 .with_unbounded(unbounded)
1858 .with_options(options_map)
1859 .with_constraints(constraints)
1860 .with_column_defaults(column_defaults)
1861 .build(),
1862 )))
1863 }
1864
1865 fn get_constraint_column_indices(
1868 &self,
1869 df_schema: &DFSchemaRef,
1870 columns: &[IndexColumn],
1871 constraint_name: &str,
1872 ) -> Result<Vec<usize>> {
1873 let field_names = df_schema.field_names();
1874 columns
1875 .iter()
1876 .map(|index_column| {
1877 let expr = &index_column.column.expr;
1878 let ident = if let SQLExpr::Identifier(ident) = expr {
1879 ident
1880 } else {
1881 return Err(plan_datafusion_err!(
1882 "Column name for {constraint_name} must be an identifier: {expr}"
1883 ));
1884 };
1885 let column = self.ident_normalizer.normalize(ident.clone());
1886 field_names
1887 .iter()
1888 .position(|item| *item == column)
1889 .ok_or_else(|| {
1890 plan_datafusion_err!(
1891 "Column for {constraint_name} not found in schema: {column}"
1892 )
1893 })
1894 })
1895 .collect::<Result<Vec<_>>>()
1896 }
1897
1898 pub fn new_constraint_from_table_constraints(
1900 &self,
1901 constraints: &[TableConstraint],
1902 df_schema: &DFSchemaRef,
1903 ) -> Result<Constraints> {
1904 let constraints = constraints
1905 .iter()
1906 .map(|c: &TableConstraint| match c {
1907 TableConstraint::Unique(UniqueConstraint {
1908 name,
1909 index_name: _,
1910 index_type_display: _,
1911 index_type: _,
1912 columns,
1913 index_options: _,
1914 characteristics: _,
1915 nulls_distinct: _,
1916 }) => {
1917 let constraint_name = match &name {
1918 Some(name) => &format!("unique constraint with name '{name}'"),
1919 None => "unique constraint",
1920 };
1921 let indices = self.get_constraint_column_indices(
1923 df_schema,
1924 columns,
1925 constraint_name,
1926 )?;
1927 Ok(Constraint::Unique(indices))
1928 }
1929 TableConstraint::PrimaryKey(PrimaryKeyConstraint {
1930 name: _,
1931 index_name: _,
1932 index_type: _,
1933 columns,
1934 index_options: _,
1935 characteristics: _,
1936 }) => {
1937 let indices = self.get_constraint_column_indices(
1939 df_schema,
1940 columns,
1941 "primary key",
1942 )?;
1943 Ok(Constraint::PrimaryKey(indices))
1944 }
1945 TableConstraint::ForeignKey { .. } => {
1946 _plan_err!("Foreign key constraints are not currently supported")
1947 }
1948 TableConstraint::Check { .. } => {
1949 _plan_err!("Check constraints are not currently supported")
1950 }
1951 TableConstraint::Index { .. } => {
1952 _plan_err!("Indexes are not currently supported")
1953 }
1954 TableConstraint::FulltextOrSpatial { .. } => {
1955 _plan_err!("Indexes are not currently supported")
1956 }
1957 TableConstraint::PrimaryKeyUsingIndex(_) => {
1958 _plan_err!(
1959 "PRIMARY KEY USING INDEX constraints are not currently supported"
1960 )
1961 }
1962 TableConstraint::UniqueUsingIndex(_) => {
1963 _plan_err!(
1964 "UNIQUE USING INDEX constraints are not currently supported"
1965 )
1966 }
1967 })
1968 .collect::<Result<Vec<_>>>()?;
1969 Ok(Constraints::new_unverified(constraints))
1970 }
1971
1972 fn parse_options_map(
1973 &self,
1974 options: Vec<(String, Value)>,
1975 allow_duplicates: bool,
1976 ) -> Result<HashMap<String, String>> {
1977 let mut options_map = HashMap::new();
1978 for (key, value) in options {
1979 if !allow_duplicates && options_map.contains_key(&key) {
1980 return plan_err!("Option {key} is specified multiple times");
1981 }
1982
1983 let Some(value_string) = crate::utils::value_to_string(&value) else {
1984 return plan_err!("Unsupported Value {}", value);
1985 };
1986
1987 if !(&key.contains('.')) {
1988 let renamed_key = format!("format.{key}");
1992 options_map.insert(renamed_key.to_lowercase(), value_string);
1993 } else {
1994 options_map.insert(key.to_lowercase(), value_string);
1995 }
1996 }
1997
1998 Ok(options_map)
1999 }
2000
2001 fn explain_to_plan(
2006 &self,
2007 verbose: bool,
2008 analyze: bool,
2009 format: Option<String>,
2010 statement: DFStatement,
2011 ) -> Result<LogicalPlan> {
2012 let plan = self.statement_to_plan(statement)?;
2013 if matches!(plan, LogicalPlan::Explain(_)) {
2014 return plan_err!("Nested EXPLAINs are not supported");
2015 }
2016
2017 let plan = Arc::new(plan);
2018 let schema = LogicalPlan::explain_schema();
2019 let schema = schema.to_dfschema_ref()?;
2020
2021 if verbose && format.is_some() {
2022 return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
2023 }
2024
2025 if analyze {
2026 if format.is_some() {
2027 return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
2028 }
2029 Ok(LogicalPlan::Analyze(Analyze {
2030 verbose,
2031 input: plan,
2032 schema,
2033 }))
2034 } else {
2035 let stringified_plans =
2036 vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
2037
2038 let options = self.context_provider.options();
2041 let format = if verbose {
2042 ExplainFormat::Indent
2043 } else if let Some(format) = format {
2044 ExplainFormat::from_str(&format)?
2045 } else {
2046 options.explain.format.clone()
2047 };
2048
2049 Ok(LogicalPlan::Explain(Explain {
2050 verbose,
2051 explain_format: format,
2052 plan,
2053 stringified_plans,
2054 schema,
2055 logical_optimization_succeeded: false,
2056 }))
2057 }
2058 }
2059
2060 fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
2061 if !self.has_table("information_schema", "df_settings") {
2062 return plan_err!(
2063 "SHOW [VARIABLE] is not supported unless information_schema is enabled"
2064 );
2065 }
2066
2067 let verbose = variable
2068 .last()
2069 .map(|s| ident_to_string(s) == "verbose")
2070 .unwrap_or(false);
2071 let mut variable_vec = variable.to_vec();
2072 let mut columns: String = "name, value".to_owned();
2073
2074 if verbose {
2075 columns = format!("{columns}, description");
2076 variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
2077 }
2078
2079 let variable = object_name_to_string(&ObjectName::from(variable_vec));
2080 let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
2081 let query = if variable == "all" {
2082 format!("{base_query} ORDER BY name")
2084 } else if variable == "timezone" || variable == "time.zone" {
2085 format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
2087 } else {
2088 let is_valid_variable = self
2092 .context_provider
2093 .options()
2094 .entries()
2095 .iter()
2096 .any(|opt| opt.key == variable);
2097
2098 let is_runtime_variable = variable.starts_with("datafusion.runtime.");
2100
2101 if !is_valid_variable && !is_runtime_variable {
2102 return plan_err!(
2103 "'{variable}' is not a variable which can be viewed with 'SHOW'"
2104 );
2105 }
2106
2107 format!("{base_query} WHERE name = '{variable}'")
2108 };
2109
2110 let mut rewrite = DFParser::parse_sql(&query)?;
2111 assert_eq!(rewrite.len(), 1);
2112
2113 self.statement_to_plan(rewrite.pop_front().unwrap())
2114 }
2115
2116 fn set_statement_to_plan(&self, statement: Set) -> Result<LogicalPlan> {
2117 match statement {
2118 Set::SingleAssignment {
2119 scope,
2120 hivevar,
2121 variable,
2122 values,
2123 } => {
2124 if scope.is_some() {
2125 return not_impl_err!("SET with scope modifiers is not supported");
2126 }
2127
2128 if hivevar {
2129 return not_impl_err!("SET HIVEVAR is not supported");
2130 }
2131
2132 let variable = object_name_to_string(&variable);
2133 let mut variable_lower = variable.to_lowercase();
2134
2135 if variable_lower == "timezone" || variable_lower == "time.zone" {
2137 variable_lower = "datafusion.execution.time_zone".to_string();
2138 }
2139
2140 if values.len() != 1 {
2141 return plan_err!("SET only supports single value assignment");
2142 }
2143
2144 let value_string = match &values[0] {
2145 SQLExpr::Identifier(i) => ident_to_string(i),
2146 SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
2147 None => {
2148 return plan_err!("Unsupported value {:?}", v.value);
2149 }
2150 Some(s) => s,
2151 },
2152 SQLExpr::UnaryOp { op, expr } => match op {
2153 UnaryOperator::Plus => format!("+{expr}"),
2154 UnaryOperator::Minus => format!("-{expr}"),
2155 _ => return plan_err!("Unsupported unary op {:?}", op),
2156 },
2157 _ => return plan_err!("Unsupported expr {:?}", values[0]),
2158 };
2159
2160 Ok(LogicalPlan::Statement(PlanStatement::SetVariable(
2161 SetVariable {
2162 variable: variable_lower,
2163 value: value_string,
2164 },
2165 )))
2166 }
2167 other => not_impl_err!("SET variant not implemented yet: {other:?}"),
2168 }
2169 }
2170
2171 fn reset_statement_to_plan(&self, statement: ResetStatement) -> Result<LogicalPlan> {
2172 match statement {
2173 ResetStatement::Variable(variable) => {
2174 let variable = object_name_to_string(&variable);
2175 let mut variable_lower = variable.to_lowercase();
2176
2177 if variable_lower == "timezone" || variable_lower == "time.zone" {
2179 variable_lower = "datafusion.execution.time_zone".to_string();
2180 }
2181
2182 Ok(LogicalPlan::Statement(PlanStatement::ResetVariable(
2183 ResetVariable {
2184 variable: variable_lower,
2185 },
2186 )))
2187 }
2188 }
2189 }
2190
2191 fn delete_to_plan(
2192 &self,
2193 table_name: &ObjectName,
2194 predicate_expr: Option<SQLExpr>,
2195 limit: Option<SQLExpr>,
2196 ) -> Result<LogicalPlan> {
2197 let table_ref = self.object_name_to_table_reference(table_name.clone())?;
2199 let table_source = self.context_provider.get_table_source(table_ref.clone())?;
2200 let schema = DFSchema::try_from_qualified_schema(
2201 table_ref.clone(),
2202 &table_source.schema(),
2203 )?;
2204 let scan =
2205 LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
2206 .build()?;
2207 let mut planner_context = PlannerContext::new();
2208
2209 let mut source = match predicate_expr {
2210 None => scan,
2211 Some(predicate_expr) => {
2212 let filter_expr =
2213 self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
2214 let schema = Arc::new(schema);
2215 let mut using_columns = HashSet::new();
2216 expr_to_columns(&filter_expr, &mut using_columns)?;
2217 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2218 filter_expr,
2219 &[&[&schema]],
2220 &[using_columns],
2221 )?;
2222 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2223 }
2224 };
2225
2226 if let Some(limit) = limit {
2227 let empty_schema = DFSchema::empty();
2228 let limit = self.sql_to_expr(limit, &empty_schema, &mut planner_context)?;
2229 source = LogicalPlanBuilder::from(source)
2230 .limit_by_expr(None, Some(limit))?
2231 .build()?
2232 }
2233
2234 let plan = LogicalPlan::Dml(DmlStatement::new(
2235 table_ref,
2236 table_source,
2237 WriteOp::Delete,
2238 Arc::new(source),
2239 ));
2240 Ok(plan)
2241 }
2242
2243 fn update_to_plan(
2244 &self,
2245 table: TableWithJoins,
2246 assignments: &[Assignment],
2247 from: Option<TableWithJoins>,
2248 predicate_expr: Option<SQLExpr>,
2249 ) -> Result<LogicalPlan> {
2250 let (table_name, table_alias) = match &table.relation {
2251 TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
2252 _ => plan_err!("Cannot update non-table relation!")?,
2253 };
2254
2255 let table_name = self.object_name_to_table_reference(table_name)?;
2257 let table_source = self.context_provider.get_table_source(table_name.clone())?;
2258 let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
2259 table_name.clone(),
2260 &table_source.schema(),
2261 )?);
2262
2263 let mut planner_context = PlannerContext::new();
2265 let mut assign_map = assignments
2266 .iter()
2267 .map(|assign| {
2268 let cols = match &assign.target {
2269 AssignmentTarget::ColumnName(cols) => cols,
2270 _ => plan_err!("Tuples are not supported")?,
2271 };
2272 let col_name: &Ident = cols
2273 .0
2274 .iter()
2275 .last()
2276 .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
2277 .as_ident()
2278 .unwrap();
2279 table_schema.field_with_unqualified_name(&col_name.value)?;
2281 Ok((col_name.value.clone(), assign.value.clone()))
2282 })
2283 .collect::<Result<HashMap<String, SQLExpr>>>()?;
2284
2285 let mut input_tables = vec![table];
2287 input_tables.extend(from);
2288 let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
2289
2290 let source = match predicate_expr {
2292 None => scan,
2293 Some(predicate_expr) => {
2294 let filter_expr = self.sql_to_expr(
2295 predicate_expr,
2296 scan.schema(),
2297 &mut planner_context,
2298 )?;
2299 let mut using_columns = HashSet::new();
2300 expr_to_columns(&filter_expr, &mut using_columns)?;
2301 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2302 filter_expr,
2303 &[&[scan.schema()]],
2304 &[using_columns],
2305 )?;
2306 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2307 }
2308 };
2309
2310 let exprs = table_schema
2312 .iter()
2313 .map(|(qualifier, field)| {
2314 let expr = match assign_map.remove(field.name()) {
2315 Some(new_value) => {
2316 let mut expr = self.sql_to_expr(
2317 new_value,
2318 source.schema(),
2319 &mut planner_context,
2320 )?;
2321 if let Expr::Placeholder(placeholder) = &mut expr {
2323 placeholder.field = placeholder
2324 .field
2325 .take()
2326 .or_else(|| Some(Arc::clone(field)));
2327 }
2328 expr.cast_to(field.data_type(), source.schema())?
2330 }
2331 None => {
2332 if let Some(alias) = &table_alias {
2334 Expr::Column(Column::new(
2335 Some(self.ident_normalizer.normalize(alias.name.clone())),
2336 field.name(),
2337 ))
2338 } else {
2339 Expr::Column(Column::from((qualifier, field)))
2340 }
2341 }
2342 };
2343 Ok(expr.alias(field.name()))
2344 })
2345 .collect::<Result<Vec<_>>>()?;
2346
2347 let source = project(source, exprs)?;
2348
2349 let plan = LogicalPlan::Dml(DmlStatement::new(
2350 table_name,
2351 table_source,
2352 WriteOp::Update,
2353 Arc::new(source),
2354 ));
2355 Ok(plan)
2356 }
2357
2358 fn insert_to_plan(
2359 &self,
2360 table_name: ObjectName,
2361 columns: Vec<ObjectName>,
2362 source: Box<Query>,
2363 overwrite: bool,
2364 replace_into: bool,
2365 ) -> Result<LogicalPlan> {
2366 let table_name = self.object_name_to_table_reference(table_name)?;
2368 let table_source = self.context_provider.get_table_source(table_name.clone())?;
2369 let table_schema = DFSchema::try_from(table_source.schema())?;
2370
2371 let columns: Vec<Ident> = columns
2372 .into_iter()
2373 .map(|name| {
2374 if name.0.len() != 1 {
2375 return not_impl_err!(
2376 "Multi-part column names in INSERT not supported: {name}"
2377 );
2378 }
2379 let part = &name.0[0];
2380 let Some(ident) = part.as_ident() else {
2381 return not_impl_err!(
2382 "Non-identifier column name part in INSERT not supported: {part}"
2383 );
2384 };
2385 Ok(ident.clone())
2386 })
2387 .collect::<Result<Vec<_>>>()?;
2388
2389 let (fields, value_indices) = if columns.is_empty() {
2397 (
2399 table_schema.fields().clone(),
2400 (0..table_schema.fields().len())
2401 .map(Some)
2402 .collect::<Vec<_>>(),
2403 )
2404 } else {
2405 let mut value_indices = vec![None; table_schema.fields().len()];
2406 let fields = columns
2407 .into_iter()
2408 .enumerate()
2409 .map(|(i, c)| {
2410 let c = self.ident_normalizer.normalize(c);
2411 let column_index = table_schema
2412 .index_of_column_by_name(None, &c)
2413 .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
2414
2415 if value_indices[column_index].is_some() {
2416 return schema_err!(SchemaError::DuplicateUnqualifiedField {
2417 name: c,
2418 });
2419 } else {
2420 value_indices[column_index] = Some(i);
2421 }
2422 Ok(Arc::clone(table_schema.field(column_index)))
2423 })
2424 .collect::<Result<Vec<_>>>()?;
2425 (Fields::from(fields), value_indices)
2426 };
2427
2428 let mut prepare_param_data_types = BTreeMap::new();
2430 if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2431 for row in rows.iter() {
2432 for (idx, val) in row.content.iter().enumerate() {
2433 if let SQLExpr::Value(ValueWithSpan {
2434 value: Value::Placeholder(name),
2435 span: _,
2436 }) = val
2437 {
2438 let name =
2439 name.replace('$', "").parse::<usize>().map_err(|_| {
2440 plan_datafusion_err!("Can't parse placeholder: {name}")
2441 })? - 1;
2442 let field = fields.get(idx).ok_or_else(|| {
2443 plan_datafusion_err!(
2444 "Placeholder ${} refers to a non existent column",
2445 idx + 1
2446 )
2447 })?;
2448 let _ = prepare_param_data_types.insert(name, Arc::clone(field));
2449 }
2450 }
2451 }
2452 }
2453 let prepare_param_data_types = {
2454 let len = prepare_param_data_types.keys().last().map_or(0, |&k| k + 1);
2455 (0..len)
2456 .map(|i| prepare_param_data_types.remove(&i))
2457 .collect()
2458 };
2459
2460 let mut planner_context =
2462 PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2463 planner_context.set_table_schema(Some(DFSchemaRef::new(
2464 DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2465 )));
2466 let source = self.query_to_plan(*source, &mut planner_context)?;
2467 if fields.len() != source.schema().fields().len() {
2468 plan_err!("Column count doesn't match insert query!")?;
2469 }
2470
2471 let exprs = value_indices
2472 .into_iter()
2473 .enumerate()
2474 .map(|(i, value_index)| {
2475 let target_field = table_schema.field(i);
2476 let expr = match value_index {
2477 Some(v) => {
2478 Expr::Column(Column::from(source.schema().qualified_field(v)))
2479 .cast_to(target_field.data_type(), source.schema())?
2480 }
2481 None => table_source
2483 .get_column_default(target_field.name())
2484 .cloned()
2485 .unwrap_or_else(|| {
2486 Expr::Literal(ScalarValue::Null, None)
2488 })
2489 .cast_to(target_field.data_type(), &DFSchema::empty())?,
2490 };
2491 Ok(expr.alias(target_field.name()))
2492 })
2493 .collect::<Result<Vec<Expr>>>()?;
2494 let source = project(source, exprs)?;
2495
2496 let insert_op = match (overwrite, replace_into) {
2497 (false, false) => InsertOp::Append,
2498 (true, false) => InsertOp::Overwrite,
2499 (false, true) => InsertOp::Replace,
2500 (true, true) => plan_err!(
2501 "Conflicting insert operations: `overwrite` and `replace_into` cannot both be true"
2502 )?,
2503 };
2504
2505 let plan = LogicalPlan::Dml(DmlStatement::new(
2506 table_name,
2507 Arc::clone(&table_source),
2508 WriteOp::Insert(insert_op),
2509 Arc::new(source),
2510 ));
2511 Ok(plan)
2512 }
2513
2514 fn show_columns_to_plan(
2515 &self,
2516 extended: bool,
2517 full: bool,
2518 sql_table_name: ObjectName,
2519 ) -> Result<LogicalPlan> {
2520 let where_clause = object_name_to_qualifier(
2522 &sql_table_name,
2523 self.options.enable_ident_normalization,
2524 )?;
2525
2526 if !self.has_table("information_schema", "columns") {
2527 return plan_err!(
2528 "SHOW COLUMNS is not supported unless information_schema is enabled"
2529 );
2530 }
2531
2532 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2534 let _ = self.context_provider.get_table_source(table_ref)?;
2535
2536 let select_list = if full || extended {
2538 "*"
2539 } else {
2540 "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2541 };
2542
2543 let query = format!(
2544 "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2545 );
2546
2547 let mut rewrite = DFParser::parse_sql(&query)?;
2548 assert_eq!(rewrite.len(), 1);
2549 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2551
2552 fn show_functions_to_plan(
2563 &self,
2564 filter: Option<ShowStatementFilter>,
2565 ) -> Result<LogicalPlan> {
2566 let where_clause = if let Some(filter) = filter {
2567 match filter {
2568 ShowStatementFilter::Like(like) => {
2569 format!("WHERE p.function_name like '{like}'")
2570 }
2571 _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2572 }
2573 } else {
2574 "".to_string()
2575 };
2576
2577 let query = format!(
2578 r#"
2579SELECT DISTINCT
2580 p.*,
2581 r.function_type function_type,
2582 r.description description,
2583 r.syntax_example syntax_example
2584FROM
2585 (
2586 SELECT
2587 i.specific_name function_name,
2588 o.data_type return_type,
2589 array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2590 array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2591 FROM (
2592 SELECT
2593 specific_catalog,
2594 specific_schema,
2595 specific_name,
2596 ordinal_position,
2597 parameter_name,
2598 data_type,
2599 rid
2600 FROM
2601 information_schema.parameters
2602 WHERE
2603 parameter_mode = 'IN'
2604 ) i
2605 JOIN
2606 (
2607 SELECT
2608 specific_catalog,
2609 specific_schema,
2610 specific_name,
2611 ordinal_position,
2612 parameter_name,
2613 data_type,
2614 rid
2615 FROM
2616 information_schema.parameters
2617 WHERE
2618 parameter_mode = 'OUT'
2619 ) o
2620 ON i.specific_catalog = o.specific_catalog
2621 AND i.specific_schema = o.specific_schema
2622 AND i.specific_name = o.specific_name
2623 AND i.rid = o.rid
2624 GROUP BY 1, 2, i.rid
2625 ) as p
2626JOIN information_schema.routines r
2627ON p.function_name = r.routine_name
2628{where_clause}
2629 "#
2630 );
2631 let mut rewrite = DFParser::parse_sql(&query)?;
2632 assert_eq!(rewrite.len(), 1);
2633 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2635
2636 fn show_create_table_to_plan(
2637 &self,
2638 sql_table_name: ObjectName,
2639 ) -> Result<LogicalPlan> {
2640 if !self.has_table("information_schema", "tables") {
2641 return plan_err!(
2642 "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2643 );
2644 }
2645 let where_clause = object_name_to_qualifier(
2647 &sql_table_name,
2648 self.options.enable_ident_normalization,
2649 )?;
2650
2651 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2653 let _ = self.context_provider.get_table_source(table_ref)?;
2654
2655 let query = format!(
2656 "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2657 );
2658
2659 let mut rewrite = DFParser::parse_sql(&query)?;
2660 assert_eq!(rewrite.len(), 1);
2661 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2663
2664 fn has_table(&self, schema: &str, table: &str) -> bool {
2666 let tables_reference = TableReference::Partial {
2667 schema: schema.into(),
2668 table: table.into(),
2669 };
2670 self.context_provider
2671 .get_table_source(tables_reference)
2672 .is_ok()
2673 }
2674
2675 fn validate_transaction_kind(
2676 &self,
2677 kind: Option<&BeginTransactionKind>,
2678 ) -> Result<()> {
2679 match kind {
2680 None => Ok(()),
2682 Some(BeginTransactionKind::Transaction) => Ok(()),
2684 Some(BeginTransactionKind::Work) | Some(BeginTransactionKind::Tran) => {
2685 not_impl_err!("Transaction kind not supported: {kind:?}")
2686 }
2687 }
2688 }
2689}