1use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24 CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25 LexOrdering, ResetStatement, Statement as DFStatement,
26};
27use crate::planner::{
28 ContextProvider, PlannerContext, SqlToRel, object_name_to_qualifier,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{Field, FieldRef, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36 Column, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result,
37 ScalarValue, SchemaError, SchemaReference, TableReference, ToDFSchema, exec_err,
38 internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
39 unqualified_field_not_found,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::DdlStatement;
44use datafusion_expr::logical_plan::builder::project;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47 Analyze, CreateCatalog, CreateCatalogSchema,
48 CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49 CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50 DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51 EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52 LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare,
53 ResetVariable, SetVariable, SortExpr, Statement as PlanStatement, ToStringifiedPlan,
54 TransactionAccessMode, TransactionConclusion, TransactionEnd,
55 TransactionIsolationLevel, TransactionStart, Volatility, WriteOp, cast, col,
56};
57use sqlparser::ast::{
58 self, BeginTransactionKind, CheckConstraint, ForeignKeyConstraint, IndexColumn,
59 IndexType, NullsDistinctOption, OrderByExpr, OrderByOptions, PrimaryKeyConstraint,
60 Set, ShowStatementIn, ShowStatementOptions, SqliteOnConflict, TableObject,
61 UniqueConstraint, Update, UpdateTableFromKind, ValueWithSpan,
62};
63use sqlparser::ast::{
64 Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
65 CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
66 ObjectName, ObjectType, Query, SchemaName, SetExpr, ShowCreateObject,
67 ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
68 TransactionMode, UnaryOperator, Value,
69};
70use sqlparser::parser::ParserError::ParserError;
71
72fn ident_to_string(ident: &Ident) -> String {
73 normalize_ident(ident.to_owned())
74}
75
76fn object_name_to_string(object_name: &ObjectName) -> String {
77 object_name
78 .0
79 .iter()
80 .map(|object_name_part| {
81 object_name_part
82 .as_ident()
83 .map_or_else(String::new, ident_to_string)
86 })
87 .collect::<Vec<String>>()
88 .join(".")
89}
90
91fn get_schema_name(schema_name: &SchemaName) -> String {
92 match schema_name {
93 SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
94 SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
95 SchemaName::NamedAuthorization(schema_name, auth) => format!(
96 "{}.{}",
97 object_name_to_string(schema_name),
98 ident_to_string(auth)
99 ),
100 }
101}
102
103fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
106 let mut constraints: Vec<TableConstraint> = vec![];
107 for column in columns {
108 for ast::ColumnOptionDef { name, option } in &column.options {
109 match option {
110 ast::ColumnOption::Unique(UniqueConstraint {
111 characteristics,
112 name,
113 index_name: _index_name,
114 index_type_display: _index_type_display,
115 index_type: _index_type,
116 columns: _column,
117 index_options: _index_options,
118 nulls_distinct: _nulls_distinct,
119 }) => constraints.push(TableConstraint::Unique(UniqueConstraint {
120 name: name.clone(),
121 index_name: None,
122 index_type_display: ast::KeyOrIndexDisplay::None,
123 index_type: None,
124 columns: vec![IndexColumn {
125 column: OrderByExpr {
126 expr: SQLExpr::Identifier(column.name.clone()),
127 options: OrderByOptions {
128 asc: None,
129 nulls_first: None,
130 },
131 with_fill: None,
132 },
133 operator_class: None,
134 }],
135 index_options: vec![],
136 characteristics: *characteristics,
137 nulls_distinct: NullsDistinctOption::None,
138 })),
139 ast::ColumnOption::PrimaryKey(PrimaryKeyConstraint {
140 characteristics,
141 name: _name,
142 index_name: _index_name,
143 index_type: _index_type,
144 columns: _columns,
145 index_options: _index_options,
146 }) => {
147 constraints.push(TableConstraint::PrimaryKey(PrimaryKeyConstraint {
148 name: name.clone(),
149 index_name: None,
150 index_type: None,
151 columns: vec![IndexColumn {
152 column: OrderByExpr {
153 expr: SQLExpr::Identifier(column.name.clone()),
154 options: OrderByOptions {
155 asc: None,
156 nulls_first: None,
157 },
158 with_fill: None,
159 },
160 operator_class: None,
161 }],
162 index_options: vec![],
163 characteristics: *characteristics,
164 }))
165 }
166 ast::ColumnOption::ForeignKey(ForeignKeyConstraint {
167 foreign_table,
168 referred_columns,
169 on_delete,
170 on_update,
171 characteristics,
172 name: _name,
173 index_name: _index_name,
174 columns: _columns,
175 match_kind: _match_kind,
176 }) => {
177 constraints.push(TableConstraint::ForeignKey(ForeignKeyConstraint {
178 name: name.clone(),
179 index_name: None,
180 columns: vec![],
181 foreign_table: foreign_table.clone(),
182 referred_columns: referred_columns.clone(),
183 on_delete: *on_delete,
184 on_update: *on_update,
185 match_kind: None,
186 characteristics: *characteristics,
187 }))
188 }
189 ast::ColumnOption::Check(CheckConstraint {
190 name,
191 expr,
192 enforced: _enforced,
193 }) => constraints.push(TableConstraint::Check(CheckConstraint {
194 name: name.clone(),
195 expr: expr.clone(),
196 enforced: None,
197 })),
198 ast::ColumnOption::Default(_)
199 | ast::ColumnOption::Null
200 | ast::ColumnOption::NotNull
201 | ast::ColumnOption::DialectSpecific(_)
202 | ast::ColumnOption::CharacterSet(_)
203 | ast::ColumnOption::Generated { .. }
204 | ast::ColumnOption::Comment(_)
205 | ast::ColumnOption::Options(_)
206 | ast::ColumnOption::OnUpdate(_)
207 | ast::ColumnOption::Materialized(_)
208 | ast::ColumnOption::Ephemeral(_)
209 | ast::ColumnOption::Identity(_)
210 | ast::ColumnOption::OnConflict(_)
211 | ast::ColumnOption::Policy(_)
212 | ast::ColumnOption::Tags(_)
213 | ast::ColumnOption::Alias(_)
214 | ast::ColumnOption::Srid(_)
215 | ast::ColumnOption::Collation(_)
216 | ast::ColumnOption::Invisible => {}
217 }
218 }
219 }
220 constraints
221}
222
223impl<S: ContextProvider> SqlToRel<'_, S> {
224 pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
226 match statement {
227 DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
228 DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
229 DFStatement::CopyTo(s) => self.copy_to_plan(s),
230 DFStatement::Explain(ExplainStatement {
231 verbose,
232 analyze,
233 format,
234 statement,
235 }) => self.explain_to_plan(verbose, analyze, format, *statement),
236 DFStatement::Reset(statement) => self.reset_statement_to_plan(statement),
237 }
238 }
239
240 pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
242 self.sql_statement_to_plan_with_context_impl(
243 statement,
244 &mut PlannerContext::new(),
245 )
246 }
247
248 pub fn sql_statement_to_plan_with_context(
250 &self,
251 statement: Statement,
252 planner_context: &mut PlannerContext,
253 ) -> Result<LogicalPlan> {
254 self.sql_statement_to_plan_with_context_impl(statement, planner_context)
255 }
256
257 fn sql_statement_to_plan_with_context_impl(
258 &self,
259 statement: Statement,
260 planner_context: &mut PlannerContext,
261 ) -> Result<LogicalPlan> {
262 match statement {
263 Statement::ExplainTable {
264 describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, table_name,
266 ..
267 } => self.describe_table_to_plan(table_name),
268 Statement::Explain {
269 describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, statement,
271 ..
272 } => match *statement {
273 Statement::Query(query) => self.describe_query_to_plan(*query),
274 _ => {
275 not_impl_err!("Describing statements other than SELECT not supported")
276 }
277 },
278 Statement::Explain {
279 verbose,
280 statement,
281 analyze,
282 format,
283 describe_alias: _,
284 ..
285 } => {
286 let format = format.map(|format| format.to_string());
287 let statement = DFStatement::Statement(statement);
288 self.explain_to_plan(verbose, analyze, format, statement)
289 }
290 Statement::Query(query) => self.query_to_plan(*query, planner_context),
291 Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
292 Statement::Set(statement) => self.set_statement_to_plan(statement),
293 Statement::CreateTable(CreateTable {
294 temporary,
295 external,
296 global,
297 transient,
298 volatile,
299 hive_distribution,
300 hive_formats,
301 file_format,
302 location,
303 query,
304 name,
305 columns,
306 constraints,
307 if_not_exists,
308 or_replace,
309 without_rowid,
310 like,
311 clone,
312 comment,
313 on_commit,
314 on_cluster,
315 primary_key,
316 order_by,
317 partition_by,
318 cluster_by,
319 clustered_by,
320 strict,
321 copy_grants,
322 enable_schema_evolution,
323 change_tracking,
324 data_retention_time_in_days,
325 max_data_extension_time_in_days,
326 default_ddl_collation,
327 with_aggregation_policy,
328 with_row_access_policy,
329 with_tags,
330 iceberg,
331 external_volume,
332 base_location,
333 catalog,
334 catalog_sync,
335 storage_serialization_policy,
336 inherits,
337 table_options: CreateTableOptions::None,
338 dynamic,
339 version,
340 target_lag,
341 warehouse,
342 refresh_mode,
343 initialize,
344 require_user,
345 partition_of,
346 for_values,
347 }) => {
348 if temporary {
349 return not_impl_err!("Temporary tables not supported");
350 }
351 if external {
352 return not_impl_err!("External tables not supported");
353 }
354 if global.is_some() {
355 return not_impl_err!("Global tables not supported");
356 }
357 if transient {
358 return not_impl_err!("Transient tables not supported");
359 }
360 if volatile {
361 return not_impl_err!("Volatile tables not supported");
362 }
363 if hive_distribution != ast::HiveDistributionStyle::NONE {
364 return not_impl_err!(
365 "Hive distribution not supported: {hive_distribution:?}"
366 );
367 }
368 if hive_formats.is_some()
369 && !matches!(
370 hive_formats,
371 Some(ast::HiveFormat {
372 row_format: None,
373 serde_properties: None,
374 storage: None,
375 location: None,
376 })
377 )
378 {
379 return not_impl_err!("Hive formats not supported: {hive_formats:?}");
380 }
381 if file_format.is_some() {
382 return not_impl_err!("File format not supported");
383 }
384 if location.is_some() {
385 return not_impl_err!("Location not supported");
386 }
387 if without_rowid {
388 return not_impl_err!("Without rowid not supported");
389 }
390 if like.is_some() {
391 return not_impl_err!("Like not supported");
392 }
393 if clone.is_some() {
394 return not_impl_err!("Clone not supported");
395 }
396 if comment.is_some() {
397 return not_impl_err!("Comment not supported");
398 }
399 if on_commit.is_some() {
400 return not_impl_err!("On commit not supported");
401 }
402 if on_cluster.is_some() {
403 return not_impl_err!("On cluster not supported");
404 }
405 if primary_key.is_some() {
406 return not_impl_err!("Primary key not supported");
407 }
408 if order_by.is_some() {
409 return not_impl_err!("Order by not supported");
410 }
411 if partition_by.is_some() {
412 return not_impl_err!("Partition by not supported");
413 }
414 if cluster_by.is_some() {
415 return not_impl_err!("Cluster by not supported");
416 }
417 if clustered_by.is_some() {
418 return not_impl_err!("Clustered by not supported");
419 }
420 if strict {
421 return not_impl_err!("Strict not supported");
422 }
423 if copy_grants {
424 return not_impl_err!("Copy grants not supported");
425 }
426 if enable_schema_evolution.is_some() {
427 return not_impl_err!("Enable schema evolution not supported");
428 }
429 if change_tracking.is_some() {
430 return not_impl_err!("Change tracking not supported");
431 }
432 if data_retention_time_in_days.is_some() {
433 return not_impl_err!("Data retention time in days not supported");
434 }
435 if max_data_extension_time_in_days.is_some() {
436 return not_impl_err!(
437 "Max data extension time in days not supported"
438 );
439 }
440 if default_ddl_collation.is_some() {
441 return not_impl_err!("Default DDL collation not supported");
442 }
443 if with_aggregation_policy.is_some() {
444 return not_impl_err!("With aggregation policy not supported");
445 }
446 if with_row_access_policy.is_some() {
447 return not_impl_err!("With row access policy not supported");
448 }
449 if with_tags.is_some() {
450 return not_impl_err!("With tags not supported");
451 }
452 if iceberg {
453 return not_impl_err!("Iceberg not supported");
454 }
455 if external_volume.is_some() {
456 return not_impl_err!("External volume not supported");
457 }
458 if base_location.is_some() {
459 return not_impl_err!("Base location not supported");
460 }
461 if catalog.is_some() {
462 return not_impl_err!("Catalog not supported");
463 }
464 if catalog_sync.is_some() {
465 return not_impl_err!("Catalog sync not supported");
466 }
467 if storage_serialization_policy.is_some() {
468 return not_impl_err!("Storage serialization policy not supported");
469 }
470 if inherits.is_some() {
471 return not_impl_err!("Table inheritance not supported");
472 }
473 if dynamic {
474 return not_impl_err!("Dynamic tables not supported");
475 }
476 if version.is_some() {
477 return not_impl_err!("Version not supported");
478 }
479 if target_lag.is_some() {
480 return not_impl_err!("Target lag not supported");
481 }
482 if warehouse.is_some() {
483 return not_impl_err!("Warehouse not supported");
484 }
485 if refresh_mode.is_some() {
486 return not_impl_err!("Refresh mode not supported");
487 }
488 if initialize.is_some() {
489 return not_impl_err!("Initialize not supported");
490 }
491 if require_user {
492 return not_impl_err!("Require user not supported");
493 }
494 if partition_of.is_some() {
495 return not_impl_err!("PARTITION OF not supported");
496 }
497 if for_values.is_some() {
498 return not_impl_err!("PARTITION OF .. FOR VALUES .. not supported");
499 }
500 let mut all_constraints = constraints;
502 let inline_constraints = calc_inline_constraints_from_columns(&columns);
503 all_constraints.extend(inline_constraints);
504 let column_defaults =
506 self.build_column_defaults(&columns, planner_context)?;
507
508 let has_columns = !columns.is_empty();
509 let schema = self.build_schema(columns)?.to_dfschema_ref()?;
510 if has_columns {
511 planner_context.set_table_schema(Some(Arc::clone(&schema)));
512 }
513
514 match query {
515 Some(query) => {
516 let plan = self.query_to_plan(*query, planner_context)?;
517 let input_schema = plan.schema();
518
519 let plan = if has_columns {
520 if schema.fields().len() != input_schema.fields().len() {
521 return plan_err!(
522 "Mismatch: {} columns specified, but result has {} columns",
523 schema.fields().len(),
524 input_schema.fields().len()
525 );
526 }
527 let input_fields = input_schema.fields();
528 let project_exprs = schema
529 .fields()
530 .iter()
531 .zip(input_fields)
532 .map(|(field, input_field)| {
533 cast(
534 col(input_field.name()),
535 field.data_type().clone(),
536 )
537 .alias(field.name())
538 })
539 .collect::<Vec<_>>();
540
541 LogicalPlanBuilder::from(plan.clone())
542 .project(project_exprs)?
543 .build()?
544 } else {
545 plan
546 };
547
548 let constraints = self.new_constraint_from_table_constraints(
549 &all_constraints,
550 plan.schema(),
551 )?;
552
553 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
554 CreateMemoryTable {
555 name: self.object_name_to_table_reference(name)?,
556 constraints,
557 input: Arc::new(plan),
558 if_not_exists,
559 or_replace,
560 column_defaults,
561 temporary,
562 },
563 )))
564 }
565
566 None => {
567 let plan = EmptyRelation {
568 produce_one_row: false,
569 schema,
570 };
571 let plan = LogicalPlan::EmptyRelation(plan);
572 let constraints = self.new_constraint_from_table_constraints(
573 &all_constraints,
574 plan.schema(),
575 )?;
576 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
577 CreateMemoryTable {
578 name: self.object_name_to_table_reference(name)?,
579 constraints,
580 input: Arc::new(plan),
581 if_not_exists,
582 or_replace,
583 column_defaults,
584 temporary,
585 },
586 )))
587 }
588 }
589 }
590 Statement::CreateView(ast::CreateView {
591 or_replace,
592 materialized,
593 name,
594 columns,
595 query,
596 options: CreateTableOptions::None,
597 cluster_by,
598 comment,
599 with_no_schema_binding,
600 if_not_exists,
601 temporary,
602 to,
603 params,
604 or_alter,
605 secure,
606 name_before_not_exists,
607 }) => {
608 if materialized {
609 return not_impl_err!("Materialized views not supported")?;
610 }
611 if !cluster_by.is_empty() {
612 return not_impl_err!("Cluster by not supported")?;
613 }
614 if comment.is_some() {
615 return not_impl_err!("Comment not supported")?;
616 }
617 if with_no_schema_binding {
618 return not_impl_err!("With no schema binding not supported")?;
619 }
620 if if_not_exists {
621 return not_impl_err!("If not exists not supported")?;
622 }
623 if to.is_some() {
624 return not_impl_err!("To not supported")?;
625 }
626
627 let stmt = Statement::CreateView(ast::CreateView {
630 or_replace,
631 materialized,
632 name,
633 columns,
634 query,
635 options: CreateTableOptions::None,
636 cluster_by,
637 comment,
638 with_no_schema_binding,
639 if_not_exists,
640 temporary,
641 to,
642 params,
643 or_alter,
644 secure,
645 name_before_not_exists,
646 });
647 let sql = stmt.to_string();
648 let Statement::CreateView(ast::CreateView {
649 name,
650 columns,
651 query,
652 or_replace,
653 temporary,
654 ..
655 }) = stmt
656 else {
657 return internal_err!("Unreachable code in create view");
658 };
659
660 let columns = columns
661 .into_iter()
662 .map(|view_column_def| {
663 if let Some(options) = view_column_def.options {
664 plan_err!(
665 "Options not supported for view columns: {options:?}"
666 )
667 } else {
668 Ok(view_column_def.name)
669 }
670 })
671 .collect::<Result<Vec<_>>>()?;
672
673 let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
674 plan = self.apply_expr_alias(plan, columns)?;
675
676 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
677 name: self.object_name_to_table_reference(name)?,
678 input: Arc::new(plan),
679 or_replace,
680 definition: Some(sql),
681 temporary,
682 })))
683 }
684 Statement::ShowCreate { obj_type, obj_name } => match obj_type {
685 ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
686 _ => {
687 not_impl_err!("Only `SHOW CREATE TABLE ...` statement is supported")
688 }
689 },
690 Statement::CreateSchema {
691 schema_name,
692 if_not_exists,
693 ..
694 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
695 CreateCatalogSchema {
696 schema_name: get_schema_name(&schema_name),
697 if_not_exists,
698 schema: Arc::new(DFSchema::empty()),
699 },
700 ))),
701 Statement::CreateDatabase {
702 db_name,
703 if_not_exists,
704 ..
705 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
706 CreateCatalog {
707 catalog_name: object_name_to_string(&db_name),
708 if_not_exists,
709 schema: Arc::new(DFSchema::empty()),
710 },
711 ))),
712 Statement::Drop {
713 object_type,
714 if_exists,
715 mut names,
716 cascade,
717 restrict: _,
718 purge: _,
719 temporary: _,
720 table: _,
721 } => {
722 let name = match names.len() {
725 0 => Err(ParserError("Missing table name.".to_string()).into()),
726 1 => self.object_name_to_table_reference(names.pop().unwrap()),
727 _ => {
728 Err(ParserError("Multiple objects not supported".to_string())
729 .into())
730 }
731 }?;
732
733 match object_type {
734 ObjectType::Table => {
735 Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
736 name,
737 if_exists,
738 schema: DFSchemaRef::new(DFSchema::empty()),
739 })))
740 }
741 ObjectType::View => {
742 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
743 name,
744 if_exists,
745 schema: DFSchemaRef::new(DFSchema::empty()),
746 })))
747 }
748 ObjectType::Schema => {
749 let name = match name {
750 TableReference::Bare { table } => {
751 Ok(SchemaReference::Bare { schema: table })
752 }
753 TableReference::Partial { schema, table } => {
754 Ok(SchemaReference::Full {
755 schema: table,
756 catalog: schema,
757 })
758 }
759 TableReference::Full {
760 catalog: _,
761 schema: _,
762 table: _,
763 } => Err(ParserError(
764 "Invalid schema specifier (has 3 parts)".to_string(),
765 )),
766 }?;
767 Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(
768 DropCatalogSchema {
769 name,
770 if_exists,
771 cascade,
772 schema: DFSchemaRef::new(DFSchema::empty()),
773 },
774 )))
775 }
776 _ => not_impl_err!(
777 "Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
778 ),
779 }
780 }
781 Statement::Prepare {
782 name,
783 data_types,
784 statement,
785 } => {
786 let mut fields: Vec<FieldRef> = data_types
788 .into_iter()
789 .map(|t| self.convert_data_type_to_field(&t))
790 .collect::<Result<_>>()?;
791
792 let mut planner_context =
794 PlannerContext::new().with_prepare_param_data_types(fields.clone());
795
796 let plan = self.sql_statement_to_plan_with_context_impl(
798 *statement,
799 &mut planner_context,
800 )?;
801
802 if fields.is_empty() {
803 let map_types = plan.get_parameter_fields()?;
804 let param_types: Vec<_> = (1..=map_types.len())
805 .filter_map(|i| {
806 let key = format!("${i}");
807 map_types.get(&key).and_then(|opt| opt.clone())
808 })
809 .collect();
810 fields.extend(param_types.iter().cloned());
811 planner_context.with_prepare_param_data_types(param_types);
812 }
813
814 Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
815 name: ident_to_string(&name),
816 fields,
817 input: Arc::new(plan),
818 })))
819 }
820 Statement::Execute {
821 name,
822 parameters,
823 using,
824 has_parentheses: _,
827 immediate,
828 into,
829 output,
830 default,
831 } => {
832 if !using.is_empty() {
834 return not_impl_err!(
835 "Execute statement with USING is not supported"
836 );
837 }
838 if immediate {
839 return not_impl_err!(
840 "Execute statement with IMMEDIATE is not supported"
841 );
842 }
843 if !into.is_empty() {
844 return not_impl_err!("Execute statement with INTO is not supported");
845 }
846 if output {
847 return not_impl_err!(
848 "Execute statement with OUTPUT is not supported"
849 );
850 }
851 if default {
852 return not_impl_err!(
853 "Execute statement with DEFAULT is not supported"
854 );
855 }
856 let empty_schema = DFSchema::empty();
857 let parameters = parameters
858 .into_iter()
859 .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
860 .collect::<Result<Vec<Expr>>>()?;
861
862 Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
863 name: object_name_to_string(&name.unwrap()),
864 parameters,
865 })))
866 }
867 Statement::Deallocate {
868 name,
869 prepare: _,
871 } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
872 Deallocate {
873 name: ident_to_string(&name),
874 },
875 ))),
876
877 Statement::ShowTables {
878 extended,
879 full,
880 terse,
881 history,
882 external,
883 show_options,
884 } => {
885 if extended {
888 return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
889 }
890 if full {
891 return not_impl_err!("SHOW FULL TABLES not supported")?;
892 }
893 if terse {
894 return not_impl_err!("SHOW TERSE TABLES not supported")?;
895 }
896 if history {
897 return not_impl_err!("SHOW TABLES HISTORY not supported")?;
898 }
899 if external {
900 return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
901 }
902 let ShowStatementOptions {
903 show_in,
904 starts_with,
905 limit,
906 limit_from,
907 filter_position,
908 } = show_options;
909 if show_in.is_some() {
910 return not_impl_err!("SHOW TABLES IN not supported")?;
911 }
912 if starts_with.is_some() {
913 return not_impl_err!("SHOW TABLES LIKE not supported")?;
914 }
915 if limit.is_some() {
916 return not_impl_err!("SHOW TABLES LIMIT not supported")?;
917 }
918 if limit_from.is_some() {
919 return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
920 }
921 if filter_position.is_some() {
922 return not_impl_err!("SHOW TABLES FILTER not supported")?;
923 }
924 self.show_tables_to_plan()
925 }
926
927 Statement::ShowColumns {
928 extended,
929 full,
930 show_options,
931 } => {
932 let ShowStatementOptions {
933 show_in,
934 starts_with,
935 limit,
936 limit_from,
937 filter_position,
938 } = show_options;
939 if starts_with.is_some() {
940 return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
941 }
942 if limit.is_some() {
943 return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
944 }
945 if limit_from.is_some() {
946 return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
947 }
948 if filter_position.is_some() {
949 return not_impl_err!(
950 "SHOW COLUMNS with WHERE or LIKE is not supported"
951 )?;
952 }
953 let Some(ShowStatementIn {
954 clause: _,
957 parent_type,
958 parent_name,
959 }) = show_in
960 else {
961 return plan_err!("SHOW COLUMNS requires a table name");
962 };
963
964 if let Some(parent_type) = parent_type {
965 return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
966 }
967 let Some(table_name) = parent_name else {
968 return plan_err!("SHOW COLUMNS requires a table name");
969 };
970
971 self.show_columns_to_plan(extended, full, table_name)
972 }
973
974 Statement::ShowFunctions { filter, .. } => {
975 self.show_functions_to_plan(filter)
976 }
977
978 Statement::Insert(Insert {
979 or,
980 into,
981 columns,
982 overwrite,
983 source,
984 partitioned,
985 after_columns,
986 table,
987 on,
988 returning,
989 ignore,
990 table_alias,
991 mut replace_into,
992 priority,
993 insert_alias,
994 assignments,
995 has_table_keyword,
996 settings,
997 format_clause,
998 insert_token: _, optimizer_hint,
1000 }) => {
1001 let table_name = match table {
1002 TableObject::TableName(table_name) => table_name,
1003 TableObject::TableFunction(_) => {
1004 return not_impl_err!(
1005 "INSERT INTO Table functions not supported"
1006 );
1007 }
1008 };
1009 if let Some(or) = or {
1010 match or {
1011 SqliteOnConflict::Replace => replace_into = true,
1012 _ => plan_err!("Inserts with {or} clause is not supported")?,
1013 }
1014 }
1015 if partitioned.is_some() {
1016 plan_err!("Partitioned inserts not yet supported")?;
1017 }
1018 if !after_columns.is_empty() {
1019 plan_err!("After-columns clause not supported")?;
1020 }
1021 if on.is_some() {
1022 plan_err!("Insert-on clause not supported")?;
1023 }
1024 if returning.is_some() {
1025 plan_err!("Insert-returning clause not supported")?;
1026 }
1027 if ignore {
1028 plan_err!("Insert-ignore clause not supported")?;
1029 }
1030 let Some(source) = source else {
1031 plan_err!("Inserts without a source not supported")?
1032 };
1033 if let Some(table_alias) = table_alias {
1034 plan_err!(
1035 "Inserts with a table alias not supported: {table_alias:?}"
1036 )?
1037 };
1038 if let Some(priority) = priority {
1039 plan_err!(
1040 "Inserts with a `PRIORITY` clause not supported: {priority:?}"
1041 )?
1042 };
1043 if insert_alias.is_some() {
1044 plan_err!("Inserts with an alias not supported")?;
1045 }
1046 if !assignments.is_empty() {
1047 plan_err!("Inserts with assignments not supported")?;
1048 }
1049 if settings.is_some() {
1050 plan_err!("Inserts with settings not supported")?;
1051 }
1052 if format_clause.is_some() {
1053 plan_err!("Inserts with format clause not supported")?;
1054 }
1055 if optimizer_hint.is_some() {
1056 plan_err!("Optimizer hints not supported")?;
1057 }
1058 let _ = into;
1060 let _ = has_table_keyword;
1061 self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
1062 }
1063 Statement::Update(Update {
1064 table,
1065 assignments,
1066 from,
1067 selection,
1068 returning,
1069 or,
1070 limit,
1071 update_token: _,
1072 optimizer_hint,
1073 }) => {
1074 let from_clauses =
1075 from.map(|update_table_from_kind| match update_table_from_kind {
1076 UpdateTableFromKind::BeforeSet(from_clauses) => from_clauses,
1077 UpdateTableFromKind::AfterSet(from_clauses) => from_clauses,
1078 });
1079 if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
1081 not_impl_err!(
1082 "Multiple tables in UPDATE SET FROM not yet supported"
1083 )?;
1084 }
1085 let update_from = from_clauses.and_then(|mut f| f.pop());
1086
1087 if update_from.is_some() {
1090 return not_impl_err!("UPDATE ... FROM is not supported");
1091 }
1092
1093 if returning.is_some() {
1094 plan_err!("Update-returning clause not yet supported")?;
1095 }
1096 if or.is_some() {
1097 plan_err!("ON conflict not supported")?;
1098 }
1099 if limit.is_some() {
1100 return not_impl_err!("Update-limit clause not supported")?;
1101 }
1102 if optimizer_hint.is_some() {
1103 plan_err!("Optimizer hints not supported")?;
1104 }
1105 self.update_to_plan(table, &assignments, update_from, selection)
1106 }
1107
1108 Statement::Delete(Delete {
1109 tables,
1110 using,
1111 selection,
1112 returning,
1113 from,
1114 order_by,
1115 limit,
1116 delete_token: _,
1117 optimizer_hint,
1118 }) => {
1119 if !tables.is_empty() {
1120 plan_err!("DELETE <TABLE> not supported")?;
1121 }
1122
1123 if using.is_some() {
1124 plan_err!("Using clause not supported")?;
1125 }
1126
1127 if returning.is_some() {
1128 plan_err!("Delete-returning clause not yet supported")?;
1129 }
1130
1131 if !order_by.is_empty() {
1132 plan_err!("Delete-order-by clause not yet supported")?;
1133 }
1134
1135 if optimizer_hint.is_some() {
1136 plan_err!("Optimizer hints not supported")?;
1137 }
1138
1139 let table_name = self.get_delete_target(from)?;
1140 self.delete_to_plan(&table_name, selection, limit)
1141 }
1142
1143 Statement::StartTransaction {
1144 modes,
1145 begin: false,
1146 modifier,
1147 transaction,
1148 statements,
1149 has_end_keyword,
1150 exception,
1151 } => {
1152 if let Some(modifier) = modifier {
1153 return not_impl_err!(
1154 "Transaction modifier not supported: {modifier}"
1155 );
1156 }
1157 if !statements.is_empty() {
1158 return not_impl_err!(
1159 "Transaction with multiple statements not supported"
1160 );
1161 }
1162 if exception.is_some() {
1163 return not_impl_err!(
1164 "Transaction with exception statements not supported"
1165 );
1166 }
1167 if has_end_keyword {
1168 return not_impl_err!("Transaction with END keyword not supported");
1169 }
1170 self.validate_transaction_kind(transaction.as_ref())?;
1171 let isolation_level: ast::TransactionIsolationLevel = modes
1172 .iter()
1173 .filter_map(|m: &TransactionMode| match m {
1174 TransactionMode::AccessMode(_) => None,
1175 TransactionMode::IsolationLevel(level) => Some(level),
1176 })
1177 .next_back()
1178 .copied()
1179 .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1180 let access_mode: ast::TransactionAccessMode = modes
1181 .iter()
1182 .filter_map(|m: &TransactionMode| match m {
1183 TransactionMode::AccessMode(mode) => Some(mode),
1184 TransactionMode::IsolationLevel(_) => None,
1185 })
1186 .next_back()
1187 .copied()
1188 .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1189 let isolation_level = match isolation_level {
1190 ast::TransactionIsolationLevel::ReadUncommitted => {
1191 TransactionIsolationLevel::ReadUncommitted
1192 }
1193 ast::TransactionIsolationLevel::ReadCommitted => {
1194 TransactionIsolationLevel::ReadCommitted
1195 }
1196 ast::TransactionIsolationLevel::RepeatableRead => {
1197 TransactionIsolationLevel::RepeatableRead
1198 }
1199 ast::TransactionIsolationLevel::Serializable => {
1200 TransactionIsolationLevel::Serializable
1201 }
1202 ast::TransactionIsolationLevel::Snapshot => {
1203 TransactionIsolationLevel::Snapshot
1204 }
1205 };
1206 let access_mode = match access_mode {
1207 ast::TransactionAccessMode::ReadOnly => {
1208 TransactionAccessMode::ReadOnly
1209 }
1210 ast::TransactionAccessMode::ReadWrite => {
1211 TransactionAccessMode::ReadWrite
1212 }
1213 };
1214 let statement = PlanStatement::TransactionStart(TransactionStart {
1215 access_mode,
1216 isolation_level,
1217 });
1218 Ok(LogicalPlan::Statement(statement))
1219 }
1220 Statement::Commit {
1221 chain,
1222 end,
1223 modifier,
1224 } => {
1225 if end {
1226 return not_impl_err!("COMMIT AND END not supported");
1227 };
1228 if let Some(modifier) = modifier {
1229 return not_impl_err!("COMMIT {modifier} not supported");
1230 };
1231 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1232 conclusion: TransactionConclusion::Commit,
1233 chain,
1234 });
1235 Ok(LogicalPlan::Statement(statement))
1236 }
1237 Statement::Rollback { chain, savepoint } => {
1238 if savepoint.is_some() {
1239 plan_err!("Savepoints not supported")?;
1240 }
1241 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1242 conclusion: TransactionConclusion::Rollback,
1243 chain,
1244 });
1245 Ok(LogicalPlan::Statement(statement))
1246 }
1247 Statement::CreateFunction(ast::CreateFunction {
1248 or_replace,
1249 temporary,
1250 name,
1251 args,
1252 return_type,
1253 function_body,
1254 behavior,
1255 language,
1256 ..
1257 }) => {
1258 let return_type = match return_type {
1259 Some(t) => Some(self.convert_data_type_to_field(&t)?),
1260 None => None,
1261 };
1262 let mut planner_context = PlannerContext::new();
1263 let empty_schema = &DFSchema::empty();
1264
1265 let args = match args {
1266 Some(function_args) => {
1267 let function_args = function_args
1268 .into_iter()
1269 .map(|arg| {
1270 let data_type =
1271 self.convert_data_type_to_field(&arg.data_type)?;
1272
1273 let default_expr = match arg.default_expr {
1274 Some(expr) => Some(self.sql_to_expr(
1275 expr,
1276 empty_schema,
1277 &mut planner_context,
1278 )?),
1279 None => None,
1280 };
1281 Ok(OperateFunctionArg {
1282 name: arg.name,
1283 default_expr,
1284 data_type: data_type.data_type().clone(),
1285 })
1286 })
1287 .collect::<Result<Vec<OperateFunctionArg>>>();
1288 Some(function_args?)
1289 }
1290 None => None,
1291 };
1292 let first_default = match args.as_ref() {
1294 Some(arg) => arg.iter().position(|t| t.default_expr.is_some()),
1295 None => None,
1296 };
1297 let last_non_default = match args.as_ref() {
1298 Some(arg) => arg
1299 .iter()
1300 .rev()
1301 .position(|t| t.default_expr.is_none())
1302 .map(|reverse_pos| arg.len() - reverse_pos - 1),
1303 None => None,
1304 };
1305 if let (Some(pos_default), Some(pos_non_default)) =
1306 (first_default, last_non_default)
1307 && pos_non_default > pos_default
1308 {
1309 return plan_err!(
1310 "Non-default arguments cannot follow default arguments."
1311 );
1312 }
1313 let name = match &name.0[..] {
1315 [] => exec_err!("Function should have name")?,
1316 [n] => n.as_ident().unwrap().value.clone(),
1317 [..] => not_impl_err!("Qualified functions are not supported")?,
1318 };
1319 let arg_types = args.as_ref().map(|arg| {
1323 arg.iter()
1324 .map(|t| {
1325 let name = match t.name.clone() {
1326 Some(name) => name.value,
1327 None => "".to_string(),
1328 };
1329 Arc::new(Field::new(name, t.data_type.clone(), true))
1330 })
1331 .collect::<Vec<_>>()
1332 });
1333 if let Some(ref fields) = arg_types {
1335 let count_positional =
1336 fields.iter().filter(|f| f.name() == "").count();
1337 if !(count_positional == 0 || count_positional == fields.len()) {
1338 return plan_err!(
1339 "All function arguments must use either named or positional style."
1340 );
1341 }
1342 }
1343 let mut planner_context = PlannerContext::new()
1344 .with_prepare_param_data_types(arg_types.unwrap_or_default());
1345
1346 let function_body = match function_body {
1347 Some(r) => Some(self.sql_to_expr(
1348 match r {
1349 ast::CreateFunctionBody::AsBeforeOptions{body: expr, link_symbol: _link_symbol} => expr,
1351 ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1352 ast::CreateFunctionBody::Return(expr) => expr,
1353 ast::CreateFunctionBody::AsBeginEnd(_) => {
1354 return not_impl_err!(
1355 "BEGIN/END enclosed function body syntax is not supported"
1356 )?;
1357 }
1358 ast::CreateFunctionBody::AsReturnExpr(_)
1359 | ast::CreateFunctionBody::AsReturnSelect(_) => {
1360 return not_impl_err!(
1361 "AS RETURN function syntax is not supported"
1362 )?
1363 }
1364 },
1365 &DFSchema::empty(),
1366 &mut planner_context,
1367 )?),
1368 None => None,
1369 };
1370
1371 let params = CreateFunctionBody {
1372 language,
1373 behavior: behavior.map(|b| match b {
1374 ast::FunctionBehavior::Immutable => Volatility::Immutable,
1375 ast::FunctionBehavior::Stable => Volatility::Stable,
1376 ast::FunctionBehavior::Volatile => Volatility::Volatile,
1377 }),
1378 function_body,
1379 };
1380
1381 let statement = DdlStatement::CreateFunction(CreateFunction {
1382 or_replace,
1383 temporary,
1384 name,
1385 return_type: return_type.map(|f| f.data_type().clone()),
1386 args,
1387 params,
1388 schema: DFSchemaRef::new(DFSchema::empty()),
1389 });
1390
1391 Ok(LogicalPlan::Ddl(statement))
1392 }
1393 Statement::DropFunction(ast::DropFunction {
1394 if_exists,
1395 func_desc,
1396 drop_behavior: _,
1397 }) => {
1398 if let Some(desc) = func_desc.first() {
1401 let name = match &desc.name.0[..] {
1403 [] => exec_err!("Function should have name")?,
1404 [n] => n.as_ident().unwrap().value.clone(),
1405 [..] => not_impl_err!("Qualified functions are not supported")?,
1406 };
1407 let statement = DdlStatement::DropFunction(DropFunction {
1408 if_exists,
1409 name,
1410 schema: DFSchemaRef::new(DFSchema::empty()),
1411 });
1412 Ok(LogicalPlan::Ddl(statement))
1413 } else {
1414 exec_err!("Function name not provided")
1415 }
1416 }
1417 Statement::Truncate(ast::Truncate {
1418 table_names,
1419 partitions,
1420 identity,
1421 cascade,
1422 on_cluster,
1423 table,
1424 if_exists,
1425 }) => {
1426 let _ = table; if table_names.len() != 1 {
1428 return not_impl_err!(
1429 "TRUNCATE with multiple tables is not supported"
1430 );
1431 }
1432
1433 let target = &table_names[0];
1434 if target.only {
1435 return not_impl_err!("TRUNCATE with ONLY is not supported");
1436 }
1437 if partitions.is_some() {
1438 return not_impl_err!("TRUNCATE with PARTITION is not supported");
1439 }
1440 if identity.is_some() {
1441 return not_impl_err!(
1442 "TRUNCATE with RESTART/CONTINUE IDENTITY is not supported"
1443 );
1444 }
1445 if cascade.is_some() {
1446 return not_impl_err!(
1447 "TRUNCATE with CASCADE/RESTRICT is not supported"
1448 );
1449 }
1450 if on_cluster.is_some() {
1451 return not_impl_err!("TRUNCATE with ON CLUSTER is not supported");
1452 }
1453 if if_exists {
1454 return not_impl_err!("TRUNCATE .. with IF EXISTS is not supported");
1455 }
1456 let table = self.object_name_to_table_reference(target.name.clone())?;
1457 let source = self.context_provider.get_table_source(table.clone())?;
1458
1459 Ok(LogicalPlan::Dml(DmlStatement::new(
1462 table.clone(),
1463 source,
1464 WriteOp::Truncate,
1465 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
1466 produce_one_row: false,
1467 schema: DFSchemaRef::new(DFSchema::empty()),
1468 })),
1469 )))
1470 }
1471 Statement::CreateIndex(CreateIndex {
1472 name,
1473 table_name,
1474 using,
1475 columns,
1476 unique,
1477 if_not_exists,
1478 ..
1479 }) => {
1480 let name: Option<String> = name.as_ref().map(object_name_to_string);
1481 let table = self.object_name_to_table_reference(table_name)?;
1482 let table_schema = self
1483 .context_provider
1484 .get_table_source(table.clone())?
1485 .schema()
1486 .to_dfschema_ref()?;
1487 let using: Option<String> =
1488 using.as_ref().map(|index_type| match index_type {
1489 IndexType::Custom(ident) => ident_to_string(ident),
1490 _ => index_type.to_string().to_ascii_lowercase(),
1491 });
1492 let order_by_exprs: Vec<OrderByExpr> =
1493 columns.into_iter().map(|col| col.column).collect();
1494 let columns = self.order_by_to_sort_expr(
1495 order_by_exprs,
1496 &table_schema,
1497 planner_context,
1498 false,
1499 None,
1500 )?;
1501 Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1502 PlanCreateIndex {
1503 name,
1504 table,
1505 using,
1506 columns,
1507 unique,
1508 if_not_exists,
1509 schema: DFSchemaRef::new(DFSchema::empty()),
1510 },
1511 )))
1512 }
1513 stmt => {
1514 not_impl_err!("Unsupported SQL statement: {stmt}")
1515 }
1516 }
1517 }
1518
1519 fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1520 let mut from = match from {
1521 FromTable::WithFromKeyword(v) => v,
1522 FromTable::WithoutKeyword(v) => v,
1523 };
1524
1525 if from.len() != 1 {
1526 return not_impl_err!(
1527 "DELETE FROM only supports single table, got {}: {from:?}",
1528 from.len()
1529 );
1530 }
1531 let table_factor = from.pop().unwrap();
1532 if !table_factor.joins.is_empty() {
1533 return not_impl_err!("DELETE FROM only supports single table, got: joins");
1534 }
1535 let TableFactor::Table { name, .. } = table_factor.relation else {
1536 return not_impl_err!(
1537 "DELETE FROM only supports single table, got: {table_factor:?}"
1538 );
1539 };
1540
1541 Ok(name)
1542 }
1543
1544 fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1546 if self.has_table("information_schema", "tables") {
1547 let query = "SELECT * FROM information_schema.tables;";
1548 let mut rewrite = DFParser::parse_sql(query)?;
1549 assert_eq!(rewrite.len(), 1);
1550 self.statement_to_plan(rewrite.pop_front().unwrap()) } else {
1552 plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1553 }
1554 }
1555
1556 fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1557 let table_ref = self.object_name_to_table_reference(table_name)?;
1558
1559 let table_source = self.context_provider.get_table_source(table_ref)?;
1560
1561 let schema = table_source.schema();
1562
1563 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1564
1565 Ok(LogicalPlan::DescribeTable(DescribeTable {
1566 schema,
1567 output_schema: Arc::new(output_schema),
1568 }))
1569 }
1570
1571 fn describe_query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
1572 let plan = self.query_to_plan(query, &mut PlannerContext::new())?;
1573
1574 let schema = Arc::new(plan.schema().as_arrow().clone());
1575
1576 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1577
1578 Ok(LogicalPlan::DescribeTable(DescribeTable {
1579 schema,
1580 output_schema: Arc::new(output_schema),
1581 }))
1582 }
1583
1584 fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1585 let copy_source = statement.source;
1587 let (input, input_schema, table_ref) = match copy_source {
1588 CopyToSource::Relation(object_name) => {
1589 let table_name = object_name_to_string(&object_name);
1590 let table_ref = self.object_name_to_table_reference(object_name)?;
1591 let table_source =
1592 self.context_provider.get_table_source(table_ref.clone())?;
1593 let plan =
1594 LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1595 let input_schema = Arc::clone(plan.schema());
1596 (plan, input_schema, Some(table_ref))
1597 }
1598 CopyToSource::Query(query) => {
1599 let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1600 let input_schema = Arc::clone(plan.schema());
1601 (plan, input_schema, None)
1602 }
1603 };
1604
1605 let options_map = self.parse_options_map(statement.options, true)?;
1606
1607 let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1608 self.context_provider.get_file_type(stored_as).ok()
1609 } else {
1610 None
1611 };
1612
1613 let file_type = match maybe_file_type {
1614 Some(ft) => ft,
1615 None => {
1616 let e = || {
1617 DataFusionError::Configuration(
1618 "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1619 .to_string(),
1620 )
1621 };
1622 let extension: &str = &Path::new(&statement.target)
1624 .extension()
1625 .ok_or_else(e)?
1626 .to_str()
1627 .ok_or_else(e)?
1628 .to_lowercase();
1629
1630 self.context_provider.get_file_type(extension)?
1631 }
1632 };
1633
1634 let partition_by = statement
1635 .partitioned_by
1636 .iter()
1637 .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1638 .collect::<Result<Vec<_>>>()?
1639 .into_iter()
1640 .map(|f| f.name().to_owned())
1641 .collect();
1642
1643 Ok(LogicalPlan::Copy(CopyTo::new(
1644 Arc::new(input),
1645 statement.target,
1646 partition_by,
1647 file_type,
1648 options_map,
1649 )))
1650 }
1651
1652 fn build_order_by(
1653 &self,
1654 order_exprs: Vec<LexOrdering>,
1655 schema: &DFSchemaRef,
1656 planner_context: &mut PlannerContext,
1657 ) -> Result<Vec<Vec<SortExpr>>> {
1658 if !order_exprs.is_empty() && schema.fields().is_empty() {
1659 let results = order_exprs
1660 .iter()
1661 .map(|lex_order| {
1662 let result = lex_order
1663 .iter()
1664 .map(|order_by_expr| {
1665 let ordered_expr = &order_by_expr.expr;
1666 let ordered_expr = ordered_expr.to_owned();
1667 let ordered_expr = self.sql_expr_to_logical_expr(
1668 ordered_expr,
1669 schema,
1670 planner_context,
1671 )?;
1672 let asc = order_by_expr.options.asc.unwrap_or(true);
1673 let nulls_first =
1674 order_by_expr.options.nulls_first.unwrap_or_else(|| {
1675 self.options.default_null_ordering.nulls_first(asc)
1676 });
1677
1678 Ok(SortExpr::new(ordered_expr, asc, nulls_first))
1679 })
1680 .collect::<Result<Vec<SortExpr>>>()?;
1681 Ok(result)
1682 })
1683 .collect::<Result<Vec<Vec<SortExpr>>>>()?;
1684
1685 return Ok(results);
1686 }
1687
1688 let mut all_results = vec![];
1689 for expr in order_exprs {
1690 let expr_vec =
1692 self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1693 for sort in expr_vec.iter() {
1695 for column in sort.expr.column_refs().iter() {
1696 if !schema.has_column(column) {
1697 return plan_err!("Column {column} is not in schema");
1699 }
1700 }
1701 }
1702 all_results.push(expr_vec)
1704 }
1705 Ok(all_results)
1706 }
1707
1708 fn external_table_to_plan(
1710 &self,
1711 statement: CreateExternalTable,
1712 ) -> Result<LogicalPlan> {
1713 let definition = Some(statement.to_string());
1714 let CreateExternalTable {
1715 name,
1716 columns,
1717 file_type,
1718 location,
1719 table_partition_cols,
1720 if_not_exists,
1721 temporary,
1722 order_exprs,
1723 unbounded,
1724 options,
1725 constraints,
1726 or_replace,
1727 } = statement;
1728
1729 let mut all_constraints = constraints;
1731 let inline_constraints = calc_inline_constraints_from_columns(&columns);
1732 all_constraints.extend(inline_constraints);
1733
1734 let options_map = self.parse_options_map(options, false)?;
1735
1736 let compression = options_map
1737 .get("format.compression")
1738 .map(|c| CompressionTypeVariant::from_str(c))
1739 .transpose()?;
1740 if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1741 && compression
1742 .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1743 .unwrap_or(false)
1744 {
1745 plan_err!(
1746 "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1747 )?;
1748 }
1749
1750 let mut planner_context = PlannerContext::new();
1751
1752 let column_defaults = self
1753 .build_column_defaults(&columns, &mut planner_context)?
1754 .into_iter()
1755 .collect();
1756
1757 let schema = self.build_schema(columns)?;
1758 let df_schema = schema.to_dfschema_ref()?;
1759 df_schema.check_names()?;
1760
1761 let ordered_exprs =
1762 self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1763
1764 let name = self.object_name_to_table_reference(name)?;
1765 let constraints =
1766 self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1767 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1768 PlanCreateExternalTable::builder(name, location, file_type, df_schema)
1769 .with_partition_cols(table_partition_cols)
1770 .with_if_not_exists(if_not_exists)
1771 .with_or_replace(or_replace)
1772 .with_temporary(temporary)
1773 .with_definition(definition)
1774 .with_order_exprs(ordered_exprs)
1775 .with_unbounded(unbounded)
1776 .with_options(options_map)
1777 .with_constraints(constraints)
1778 .with_column_defaults(column_defaults)
1779 .build(),
1780 )))
1781 }
1782
1783 fn get_constraint_column_indices(
1786 &self,
1787 df_schema: &DFSchemaRef,
1788 columns: &[IndexColumn],
1789 constraint_name: &str,
1790 ) -> Result<Vec<usize>> {
1791 let field_names = df_schema.field_names();
1792 columns
1793 .iter()
1794 .map(|index_column| {
1795 let expr = &index_column.column.expr;
1796 let ident = if let SQLExpr::Identifier(ident) = expr {
1797 ident
1798 } else {
1799 return Err(plan_datafusion_err!(
1800 "Column name for {constraint_name} must be an identifier: {expr}"
1801 ));
1802 };
1803 let column = self.ident_normalizer.normalize(ident.clone());
1804 field_names
1805 .iter()
1806 .position(|item| *item == column)
1807 .ok_or_else(|| {
1808 plan_datafusion_err!(
1809 "Column for {constraint_name} not found in schema: {column}"
1810 )
1811 })
1812 })
1813 .collect::<Result<Vec<_>>>()
1814 }
1815
1816 pub fn new_constraint_from_table_constraints(
1818 &self,
1819 constraints: &[TableConstraint],
1820 df_schema: &DFSchemaRef,
1821 ) -> Result<Constraints> {
1822 let constraints = constraints
1823 .iter()
1824 .map(|c: &TableConstraint| match c {
1825 TableConstraint::Unique(UniqueConstraint {
1826 name,
1827 index_name: _,
1828 index_type_display: _,
1829 index_type: _,
1830 columns,
1831 index_options: _,
1832 characteristics: _,
1833 nulls_distinct: _,
1834 }) => {
1835 let constraint_name = match &name {
1836 Some(name) => &format!("unique constraint with name '{name}'"),
1837 None => "unique constraint",
1838 };
1839 let indices = self.get_constraint_column_indices(
1841 df_schema,
1842 columns,
1843 constraint_name,
1844 )?;
1845 Ok(Constraint::Unique(indices))
1846 }
1847 TableConstraint::PrimaryKey(PrimaryKeyConstraint {
1848 name: _,
1849 index_name: _,
1850 index_type: _,
1851 columns,
1852 index_options: _,
1853 characteristics: _,
1854 }) => {
1855 let indices = self.get_constraint_column_indices(
1857 df_schema,
1858 columns,
1859 "primary key",
1860 )?;
1861 Ok(Constraint::PrimaryKey(indices))
1862 }
1863 TableConstraint::ForeignKey { .. } => {
1864 _plan_err!("Foreign key constraints are not currently supported")
1865 }
1866 TableConstraint::Check { .. } => {
1867 _plan_err!("Check constraints are not currently supported")
1868 }
1869 TableConstraint::Index { .. } => {
1870 _plan_err!("Indexes are not currently supported")
1871 }
1872 TableConstraint::FulltextOrSpatial { .. } => {
1873 _plan_err!("Indexes are not currently supported")
1874 }
1875 })
1876 .collect::<Result<Vec<_>>>()?;
1877 Ok(Constraints::new_unverified(constraints))
1878 }
1879
1880 fn parse_options_map(
1881 &self,
1882 options: Vec<(String, Value)>,
1883 allow_duplicates: bool,
1884 ) -> Result<HashMap<String, String>> {
1885 let mut options_map = HashMap::new();
1886 for (key, value) in options {
1887 if !allow_duplicates && options_map.contains_key(&key) {
1888 return plan_err!("Option {key} is specified multiple times");
1889 }
1890
1891 let Some(value_string) = crate::utils::value_to_string(&value) else {
1892 return plan_err!("Unsupported Value {}", value);
1893 };
1894
1895 if !(&key.contains('.')) {
1896 let renamed_key = format!("format.{key}");
1900 options_map.insert(renamed_key.to_lowercase(), value_string);
1901 } else {
1902 options_map.insert(key.to_lowercase(), value_string);
1903 }
1904 }
1905
1906 Ok(options_map)
1907 }
1908
1909 fn explain_to_plan(
1914 &self,
1915 verbose: bool,
1916 analyze: bool,
1917 format: Option<String>,
1918 statement: DFStatement,
1919 ) -> Result<LogicalPlan> {
1920 let plan = self.statement_to_plan(statement)?;
1921 if matches!(plan, LogicalPlan::Explain(_)) {
1922 return plan_err!("Nested EXPLAINs are not supported");
1923 }
1924
1925 let plan = Arc::new(plan);
1926 let schema = LogicalPlan::explain_schema();
1927 let schema = schema.to_dfschema_ref()?;
1928
1929 if verbose && format.is_some() {
1930 return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1931 }
1932
1933 if analyze {
1934 if format.is_some() {
1935 return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1936 }
1937 Ok(LogicalPlan::Analyze(Analyze {
1938 verbose,
1939 input: plan,
1940 schema,
1941 }))
1942 } else {
1943 let stringified_plans =
1944 vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1945
1946 let options = self.context_provider.options();
1949 let format = if verbose {
1950 ExplainFormat::Indent
1951 } else if let Some(format) = format {
1952 ExplainFormat::from_str(&format)?
1953 } else {
1954 options.explain.format.clone()
1955 };
1956
1957 Ok(LogicalPlan::Explain(Explain {
1958 verbose,
1959 explain_format: format,
1960 plan,
1961 stringified_plans,
1962 schema,
1963 logical_optimization_succeeded: false,
1964 }))
1965 }
1966 }
1967
1968 fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1969 if !self.has_table("information_schema", "df_settings") {
1970 return plan_err!(
1971 "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1972 );
1973 }
1974
1975 let verbose = variable
1976 .last()
1977 .map(|s| ident_to_string(s) == "verbose")
1978 .unwrap_or(false);
1979 let mut variable_vec = variable.to_vec();
1980 let mut columns: String = "name, value".to_owned();
1981
1982 if verbose {
1983 columns = format!("{columns}, description");
1984 variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1985 }
1986
1987 let variable = object_name_to_string(&ObjectName::from(variable_vec));
1988 let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1989 let query = if variable == "all" {
1990 format!("{base_query} ORDER BY name")
1992 } else if variable == "timezone" || variable == "time.zone" {
1993 format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1995 } else {
1996 let is_valid_variable = self
2000 .context_provider
2001 .options()
2002 .entries()
2003 .iter()
2004 .any(|opt| opt.key == variable);
2005
2006 let is_runtime_variable = variable.starts_with("datafusion.runtime.");
2008
2009 if !is_valid_variable && !is_runtime_variable {
2010 return plan_err!(
2011 "'{variable}' is not a variable which can be viewed with 'SHOW'"
2012 );
2013 }
2014
2015 format!("{base_query} WHERE name = '{variable}'")
2016 };
2017
2018 let mut rewrite = DFParser::parse_sql(&query)?;
2019 assert_eq!(rewrite.len(), 1);
2020
2021 self.statement_to_plan(rewrite.pop_front().unwrap())
2022 }
2023
2024 fn set_statement_to_plan(&self, statement: Set) -> Result<LogicalPlan> {
2025 match statement {
2026 Set::SingleAssignment {
2027 scope,
2028 hivevar,
2029 variable,
2030 values,
2031 } => {
2032 if scope.is_some() {
2033 return not_impl_err!("SET with scope modifiers is not supported");
2034 }
2035
2036 if hivevar {
2037 return not_impl_err!("SET HIVEVAR is not supported");
2038 }
2039
2040 let variable = object_name_to_string(&variable);
2041 let mut variable_lower = variable.to_lowercase();
2042
2043 if variable_lower == "timezone" || variable_lower == "time.zone" {
2045 variable_lower = "datafusion.execution.time_zone".to_string();
2046 }
2047
2048 if values.len() != 1 {
2049 return plan_err!("SET only supports single value assignment");
2050 }
2051
2052 let value_string = match &values[0] {
2053 SQLExpr::Identifier(i) => ident_to_string(i),
2054 SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
2055 None => {
2056 return plan_err!("Unsupported value {:?}", v.value);
2057 }
2058 Some(s) => s,
2059 },
2060 SQLExpr::UnaryOp { op, expr } => match op {
2061 UnaryOperator::Plus => format!("+{expr}"),
2062 UnaryOperator::Minus => format!("-{expr}"),
2063 _ => return plan_err!("Unsupported unary op {:?}", op),
2064 },
2065 _ => return plan_err!("Unsupported expr {:?}", values[0]),
2066 };
2067
2068 Ok(LogicalPlan::Statement(PlanStatement::SetVariable(
2069 SetVariable {
2070 variable: variable_lower,
2071 value: value_string,
2072 },
2073 )))
2074 }
2075 other => not_impl_err!("SET variant not implemented yet: {other:?}"),
2076 }
2077 }
2078
2079 fn reset_statement_to_plan(&self, statement: ResetStatement) -> Result<LogicalPlan> {
2080 match statement {
2081 ResetStatement::Variable(variable) => {
2082 let variable = object_name_to_string(&variable);
2083 let mut variable_lower = variable.to_lowercase();
2084
2085 if variable_lower == "timezone" || variable_lower == "time.zone" {
2087 variable_lower = "datafusion.execution.time_zone".to_string();
2088 }
2089
2090 Ok(LogicalPlan::Statement(PlanStatement::ResetVariable(
2091 ResetVariable {
2092 variable: variable_lower,
2093 },
2094 )))
2095 }
2096 }
2097 }
2098
2099 fn delete_to_plan(
2100 &self,
2101 table_name: &ObjectName,
2102 predicate_expr: Option<SQLExpr>,
2103 limit: Option<SQLExpr>,
2104 ) -> Result<LogicalPlan> {
2105 let table_ref = self.object_name_to_table_reference(table_name.clone())?;
2107 let table_source = self.context_provider.get_table_source(table_ref.clone())?;
2108 let schema = DFSchema::try_from_qualified_schema(
2109 table_ref.clone(),
2110 &table_source.schema(),
2111 )?;
2112 let scan =
2113 LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
2114 .build()?;
2115 let mut planner_context = PlannerContext::new();
2116
2117 let mut source = match predicate_expr {
2118 None => scan,
2119 Some(predicate_expr) => {
2120 let filter_expr =
2121 self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
2122 let schema = Arc::new(schema);
2123 let mut using_columns = HashSet::new();
2124 expr_to_columns(&filter_expr, &mut using_columns)?;
2125 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2126 filter_expr,
2127 &[&[&schema]],
2128 &[using_columns],
2129 )?;
2130 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2131 }
2132 };
2133
2134 if let Some(limit) = limit {
2135 let empty_schema = DFSchema::empty();
2136 let limit = self.sql_to_expr(limit, &empty_schema, &mut planner_context)?;
2137 source = LogicalPlanBuilder::from(source)
2138 .limit_by_expr(None, Some(limit))?
2139 .build()?
2140 }
2141
2142 let plan = LogicalPlan::Dml(DmlStatement::new(
2143 table_ref,
2144 table_source,
2145 WriteOp::Delete,
2146 Arc::new(source),
2147 ));
2148 Ok(plan)
2149 }
2150
2151 fn update_to_plan(
2152 &self,
2153 table: TableWithJoins,
2154 assignments: &[Assignment],
2155 from: Option<TableWithJoins>,
2156 predicate_expr: Option<SQLExpr>,
2157 ) -> Result<LogicalPlan> {
2158 let (table_name, table_alias) = match &table.relation {
2159 TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
2160 _ => plan_err!("Cannot update non-table relation!")?,
2161 };
2162
2163 let table_name = self.object_name_to_table_reference(table_name)?;
2165 let table_source = self.context_provider.get_table_source(table_name.clone())?;
2166 let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
2167 table_name.clone(),
2168 &table_source.schema(),
2169 )?);
2170
2171 let mut planner_context = PlannerContext::new();
2173 let mut assign_map = assignments
2174 .iter()
2175 .map(|assign| {
2176 let cols = match &assign.target {
2177 AssignmentTarget::ColumnName(cols) => cols,
2178 _ => plan_err!("Tuples are not supported")?,
2179 };
2180 let col_name: &Ident = cols
2181 .0
2182 .iter()
2183 .last()
2184 .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
2185 .as_ident()
2186 .unwrap();
2187 table_schema.field_with_unqualified_name(&col_name.value)?;
2189 Ok((col_name.value.clone(), assign.value.clone()))
2190 })
2191 .collect::<Result<HashMap<String, SQLExpr>>>()?;
2192
2193 let mut input_tables = vec![table];
2195 input_tables.extend(from);
2196 let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
2197
2198 let source = match predicate_expr {
2200 None => scan,
2201 Some(predicate_expr) => {
2202 let filter_expr = self.sql_to_expr(
2203 predicate_expr,
2204 scan.schema(),
2205 &mut planner_context,
2206 )?;
2207 let mut using_columns = HashSet::new();
2208 expr_to_columns(&filter_expr, &mut using_columns)?;
2209 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2210 filter_expr,
2211 &[&[scan.schema()]],
2212 &[using_columns],
2213 )?;
2214 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2215 }
2216 };
2217
2218 let exprs = table_schema
2220 .iter()
2221 .map(|(qualifier, field)| {
2222 let expr = match assign_map.remove(field.name()) {
2223 Some(new_value) => {
2224 let mut expr = self.sql_to_expr(
2225 new_value,
2226 source.schema(),
2227 &mut planner_context,
2228 )?;
2229 if let Expr::Placeholder(placeholder) = &mut expr {
2231 placeholder.field = placeholder
2232 .field
2233 .take()
2234 .or_else(|| Some(Arc::clone(field)));
2235 }
2236 expr.cast_to(field.data_type(), source.schema())?
2238 }
2239 None => {
2240 if let Some(alias) = &table_alias {
2242 Expr::Column(Column::new(
2243 Some(self.ident_normalizer.normalize(alias.name.clone())),
2244 field.name(),
2245 ))
2246 } else {
2247 Expr::Column(Column::from((qualifier, field)))
2248 }
2249 }
2250 };
2251 Ok(expr.alias(field.name()))
2252 })
2253 .collect::<Result<Vec<_>>>()?;
2254
2255 let source = project(source, exprs)?;
2256
2257 let plan = LogicalPlan::Dml(DmlStatement::new(
2258 table_name,
2259 table_source,
2260 WriteOp::Update,
2261 Arc::new(source),
2262 ));
2263 Ok(plan)
2264 }
2265
2266 fn insert_to_plan(
2267 &self,
2268 table_name: ObjectName,
2269 columns: Vec<Ident>,
2270 source: Box<Query>,
2271 overwrite: bool,
2272 replace_into: bool,
2273 ) -> Result<LogicalPlan> {
2274 let table_name = self.object_name_to_table_reference(table_name)?;
2276 let table_source = self.context_provider.get_table_source(table_name.clone())?;
2277 let table_schema = DFSchema::try_from(table_source.schema())?;
2278
2279 let (fields, value_indices) = if columns.is_empty() {
2287 (
2289 table_schema.fields().clone(),
2290 (0..table_schema.fields().len())
2291 .map(Some)
2292 .collect::<Vec<_>>(),
2293 )
2294 } else {
2295 let mut value_indices = vec![None; table_schema.fields().len()];
2296 let fields = columns
2297 .into_iter()
2298 .enumerate()
2299 .map(|(i, c)| {
2300 let c = self.ident_normalizer.normalize(c);
2301 let column_index = table_schema
2302 .index_of_column_by_name(None, &c)
2303 .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
2304
2305 if value_indices[column_index].is_some() {
2306 return schema_err!(SchemaError::DuplicateUnqualifiedField {
2307 name: c,
2308 });
2309 } else {
2310 value_indices[column_index] = Some(i);
2311 }
2312 Ok(Arc::clone(table_schema.field(column_index)))
2313 })
2314 .collect::<Result<Vec<_>>>()?;
2315 (Fields::from(fields), value_indices)
2316 };
2317
2318 let mut prepare_param_data_types = BTreeMap::new();
2320 if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2321 for row in rows.iter() {
2322 for (idx, val) in row.iter().enumerate() {
2323 if let SQLExpr::Value(ValueWithSpan {
2324 value: Value::Placeholder(name),
2325 span: _,
2326 }) = val
2327 {
2328 let name =
2329 name.replace('$', "").parse::<usize>().map_err(|_| {
2330 plan_datafusion_err!("Can't parse placeholder: {name}")
2331 })? - 1;
2332 let field = fields.get(idx).ok_or_else(|| {
2333 plan_datafusion_err!(
2334 "Placeholder ${} refers to a non existent column",
2335 idx + 1
2336 )
2337 })?;
2338 let _ = prepare_param_data_types.insert(name, Arc::clone(field));
2339 }
2340 }
2341 }
2342 }
2343 let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2344
2345 let mut planner_context =
2347 PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2348 planner_context.set_table_schema(Some(DFSchemaRef::new(
2349 DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2350 )));
2351 let source = self.query_to_plan(*source, &mut planner_context)?;
2352 if fields.len() != source.schema().fields().len() {
2353 plan_err!("Column count doesn't match insert query!")?;
2354 }
2355
2356 let exprs = value_indices
2357 .into_iter()
2358 .enumerate()
2359 .map(|(i, value_index)| {
2360 let target_field = table_schema.field(i);
2361 let expr = match value_index {
2362 Some(v) => {
2363 Expr::Column(Column::from(source.schema().qualified_field(v)))
2364 .cast_to(target_field.data_type(), source.schema())?
2365 }
2366 None => table_source
2368 .get_column_default(target_field.name())
2369 .cloned()
2370 .unwrap_or_else(|| {
2371 Expr::Literal(ScalarValue::Null, None)
2373 })
2374 .cast_to(target_field.data_type(), &DFSchema::empty())?,
2375 };
2376 Ok(expr.alias(target_field.name()))
2377 })
2378 .collect::<Result<Vec<Expr>>>()?;
2379 let source = project(source, exprs)?;
2380
2381 let insert_op = match (overwrite, replace_into) {
2382 (false, false) => InsertOp::Append,
2383 (true, false) => InsertOp::Overwrite,
2384 (false, true) => InsertOp::Replace,
2385 (true, true) => plan_err!(
2386 "Conflicting insert operations: `overwrite` and `replace_into` cannot both be true"
2387 )?,
2388 };
2389
2390 let plan = LogicalPlan::Dml(DmlStatement::new(
2391 table_name,
2392 Arc::clone(&table_source),
2393 WriteOp::Insert(insert_op),
2394 Arc::new(source),
2395 ));
2396 Ok(plan)
2397 }
2398
2399 fn show_columns_to_plan(
2400 &self,
2401 extended: bool,
2402 full: bool,
2403 sql_table_name: ObjectName,
2404 ) -> Result<LogicalPlan> {
2405 let where_clause = object_name_to_qualifier(
2407 &sql_table_name,
2408 self.options.enable_ident_normalization,
2409 )?;
2410
2411 if !self.has_table("information_schema", "columns") {
2412 return plan_err!(
2413 "SHOW COLUMNS is not supported unless information_schema is enabled"
2414 );
2415 }
2416
2417 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2419 let _ = self.context_provider.get_table_source(table_ref)?;
2420
2421 let select_list = if full || extended {
2423 "*"
2424 } else {
2425 "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2426 };
2427
2428 let query = format!(
2429 "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2430 );
2431
2432 let mut rewrite = DFParser::parse_sql(&query)?;
2433 assert_eq!(rewrite.len(), 1);
2434 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2436
2437 fn show_functions_to_plan(
2448 &self,
2449 filter: Option<ShowStatementFilter>,
2450 ) -> Result<LogicalPlan> {
2451 let where_clause = if let Some(filter) = filter {
2452 match filter {
2453 ShowStatementFilter::Like(like) => {
2454 format!("WHERE p.function_name like '{like}'")
2455 }
2456 _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2457 }
2458 } else {
2459 "".to_string()
2460 };
2461
2462 let query = format!(
2463 r#"
2464SELECT DISTINCT
2465 p.*,
2466 r.function_type function_type,
2467 r.description description,
2468 r.syntax_example syntax_example
2469FROM
2470 (
2471 SELECT
2472 i.specific_name function_name,
2473 o.data_type return_type,
2474 array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2475 array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2476 FROM (
2477 SELECT
2478 specific_catalog,
2479 specific_schema,
2480 specific_name,
2481 ordinal_position,
2482 parameter_name,
2483 data_type,
2484 rid
2485 FROM
2486 information_schema.parameters
2487 WHERE
2488 parameter_mode = 'IN'
2489 ) i
2490 JOIN
2491 (
2492 SELECT
2493 specific_catalog,
2494 specific_schema,
2495 specific_name,
2496 ordinal_position,
2497 parameter_name,
2498 data_type,
2499 rid
2500 FROM
2501 information_schema.parameters
2502 WHERE
2503 parameter_mode = 'OUT'
2504 ) o
2505 ON i.specific_catalog = o.specific_catalog
2506 AND i.specific_schema = o.specific_schema
2507 AND i.specific_name = o.specific_name
2508 AND i.rid = o.rid
2509 GROUP BY 1, 2, i.rid
2510 ) as p
2511JOIN information_schema.routines r
2512ON p.function_name = r.routine_name
2513{where_clause}
2514 "#
2515 );
2516 let mut rewrite = DFParser::parse_sql(&query)?;
2517 assert_eq!(rewrite.len(), 1);
2518 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2520
2521 fn show_create_table_to_plan(
2522 &self,
2523 sql_table_name: ObjectName,
2524 ) -> Result<LogicalPlan> {
2525 if !self.has_table("information_schema", "tables") {
2526 return plan_err!(
2527 "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2528 );
2529 }
2530 let where_clause = object_name_to_qualifier(
2532 &sql_table_name,
2533 self.options.enable_ident_normalization,
2534 )?;
2535
2536 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2538 let _ = self.context_provider.get_table_source(table_ref)?;
2539
2540 let query = format!(
2541 "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2542 );
2543
2544 let mut rewrite = DFParser::parse_sql(&query)?;
2545 assert_eq!(rewrite.len(), 1);
2546 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2548
2549 fn has_table(&self, schema: &str, table: &str) -> bool {
2551 let tables_reference = TableReference::Partial {
2552 schema: schema.into(),
2553 table: table.into(),
2554 };
2555 self.context_provider
2556 .get_table_source(tables_reference)
2557 .is_ok()
2558 }
2559
2560 fn validate_transaction_kind(
2561 &self,
2562 kind: Option<&BeginTransactionKind>,
2563 ) -> Result<()> {
2564 match kind {
2565 None => Ok(()),
2567 Some(BeginTransactionKind::Transaction) => Ok(()),
2569 Some(BeginTransactionKind::Work) => {
2570 not_impl_err!("Transaction kind not supported: {kind:?}")
2571 }
2572 }
2573 }
2574}