1use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24 CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25 LexOrdering, ResetStatement, Statement as DFStatement,
26};
27use crate::planner::{
28 ContextProvider, PlannerContext, SqlToRel, object_name_to_qualifier,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{Field, FieldRef, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36 Column, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result,
37 ScalarValue, SchemaError, SchemaReference, TableReference, ToDFSchema, exec_err,
38 internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
39 unqualified_field_not_found,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::DdlStatement;
44use datafusion_expr::logical_plan::builder::project;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47 Analyze, CreateCatalog, CreateCatalogSchema,
48 CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49 CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50 DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51 EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52 LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare,
53 ResetVariable, SetVariable, SortExpr, Statement as PlanStatement, ToStringifiedPlan,
54 TransactionAccessMode, TransactionConclusion, TransactionEnd,
55 TransactionIsolationLevel, TransactionStart, Volatility, WriteOp, cast, col,
56};
57use sqlparser::ast::{
58 self, BeginTransactionKind, IndexColumn, IndexType, NullsDistinctOption, OrderByExpr,
59 OrderByOptions, Set, ShowStatementIn, ShowStatementOptions, SqliteOnConflict,
60 TableObject, UpdateTableFromKind, ValueWithSpan,
61};
62use sqlparser::ast::{
63 Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
64 CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
65 ObjectName, ObjectType, Query, SchemaName, SetExpr, ShowCreateObject,
66 ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
67 TransactionMode, UnaryOperator, Value,
68};
69use sqlparser::parser::ParserError::ParserError;
70
71fn ident_to_string(ident: &Ident) -> String {
72 normalize_ident(ident.to_owned())
73}
74
75fn object_name_to_string(object_name: &ObjectName) -> String {
76 object_name
77 .0
78 .iter()
79 .map(|object_name_part| {
80 object_name_part
81 .as_ident()
82 .map_or_else(String::new, ident_to_string)
85 })
86 .collect::<Vec<String>>()
87 .join(".")
88}
89
90fn get_schema_name(schema_name: &SchemaName) -> String {
91 match schema_name {
92 SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
93 SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
94 SchemaName::NamedAuthorization(schema_name, auth) => format!(
95 "{}.{}",
96 object_name_to_string(schema_name),
97 ident_to_string(auth)
98 ),
99 }
100}
101
102fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
105 let mut constraints = vec![];
106 for column in columns {
107 for ast::ColumnOptionDef { name, option } in &column.options {
108 match option {
109 ast::ColumnOption::Unique {
110 is_primary: false,
111 characteristics,
112 } => constraints.push(TableConstraint::Unique {
113 name: name.clone(),
114 columns: vec![IndexColumn {
115 column: OrderByExpr {
116 expr: SQLExpr::Identifier(column.name.clone()),
117 options: OrderByOptions {
118 asc: None,
119 nulls_first: None,
120 },
121 with_fill: None,
122 },
123 operator_class: None,
124 }],
125 characteristics: *characteristics,
126 index_name: None,
127 index_type_display: ast::KeyOrIndexDisplay::None,
128 index_type: None,
129 index_options: vec![],
130 nulls_distinct: NullsDistinctOption::None,
131 }),
132 ast::ColumnOption::Unique {
133 is_primary: true,
134 characteristics,
135 } => constraints.push(TableConstraint::PrimaryKey {
136 name: name.clone(),
137 columns: vec![IndexColumn {
138 column: OrderByExpr {
139 expr: SQLExpr::Identifier(column.name.clone()),
140 options: OrderByOptions {
141 asc: None,
142 nulls_first: None,
143 },
144 with_fill: None,
145 },
146 operator_class: None,
147 }],
148 characteristics: *characteristics,
149 index_name: None,
150 index_type: None,
151 index_options: vec![],
152 }),
153 ast::ColumnOption::ForeignKey {
154 foreign_table,
155 referred_columns,
156 on_delete,
157 on_update,
158 characteristics,
159 } => constraints.push(TableConstraint::ForeignKey {
160 name: name.clone(),
161 columns: vec![],
162 foreign_table: foreign_table.clone(),
163 referred_columns: referred_columns.to_vec(),
164 on_delete: *on_delete,
165 on_update: *on_update,
166 characteristics: *characteristics,
167 index_name: None,
168 }),
169 ast::ColumnOption::Check(expr) => {
170 constraints.push(TableConstraint::Check {
171 name: name.clone(),
172 expr: Box::new(expr.clone()),
173 enforced: None,
174 })
175 }
176 ast::ColumnOption::Default(_)
178 | ast::ColumnOption::Null
179 | ast::ColumnOption::NotNull
180 | ast::ColumnOption::DialectSpecific(_)
181 | ast::ColumnOption::CharacterSet(_)
182 | ast::ColumnOption::Generated { .. }
183 | ast::ColumnOption::Comment(_)
184 | ast::ColumnOption::Options(_)
185 | ast::ColumnOption::OnUpdate(_)
186 | ast::ColumnOption::Materialized(_)
187 | ast::ColumnOption::Ephemeral(_)
188 | ast::ColumnOption::Identity(_)
189 | ast::ColumnOption::OnConflict(_)
190 | ast::ColumnOption::Policy(_)
191 | ast::ColumnOption::Tags(_)
192 | ast::ColumnOption::Alias(_)
193 | ast::ColumnOption::Srid(_)
194 | ast::ColumnOption::Collation(_) => {}
195 }
196 }
197 }
198 constraints
199}
200
201impl<S: ContextProvider> SqlToRel<'_, S> {
202 pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
204 match statement {
205 DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
206 DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
207 DFStatement::CopyTo(s) => self.copy_to_plan(s),
208 DFStatement::Explain(ExplainStatement {
209 verbose,
210 analyze,
211 format,
212 statement,
213 }) => self.explain_to_plan(verbose, analyze, format, *statement),
214 DFStatement::Reset(statement) => self.reset_statement_to_plan(statement),
215 }
216 }
217
218 pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
220 self.sql_statement_to_plan_with_context_impl(
221 statement,
222 &mut PlannerContext::new(),
223 )
224 }
225
226 pub fn sql_statement_to_plan_with_context(
228 &self,
229 statement: Statement,
230 planner_context: &mut PlannerContext,
231 ) -> Result<LogicalPlan> {
232 self.sql_statement_to_plan_with_context_impl(statement, planner_context)
233 }
234
235 fn sql_statement_to_plan_with_context_impl(
236 &self,
237 statement: Statement,
238 planner_context: &mut PlannerContext,
239 ) -> Result<LogicalPlan> {
240 match statement {
241 Statement::ExplainTable {
242 describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, table_name,
244 ..
245 } => self.describe_table_to_plan(table_name),
246 Statement::Explain {
247 describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, statement,
249 ..
250 } => match *statement {
251 Statement::Query(query) => self.describe_query_to_plan(*query),
252 _ => {
253 not_impl_err!("Describing statements other than SELECT not supported")
254 }
255 },
256 Statement::Explain {
257 verbose,
258 statement,
259 analyze,
260 format,
261 describe_alias: _,
262 ..
263 } => {
264 let format = format.map(|format| format.to_string());
265 let statement = DFStatement::Statement(statement);
266 self.explain_to_plan(verbose, analyze, format, statement)
267 }
268 Statement::Query(query) => self.query_to_plan(*query, planner_context),
269 Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
270 Statement::Set(statement) => self.set_statement_to_plan(statement),
271 Statement::CreateTable(CreateTable {
272 temporary,
273 external,
274 global,
275 transient,
276 volatile,
277 hive_distribution,
278 hive_formats,
279 file_format,
280 location,
281 query,
282 name,
283 columns,
284 constraints,
285 if_not_exists,
286 or_replace,
287 without_rowid,
288 like,
289 clone,
290 comment,
291 on_commit,
292 on_cluster,
293 primary_key,
294 order_by,
295 partition_by,
296 cluster_by,
297 clustered_by,
298 strict,
299 copy_grants,
300 enable_schema_evolution,
301 change_tracking,
302 data_retention_time_in_days,
303 max_data_extension_time_in_days,
304 default_ddl_collation,
305 with_aggregation_policy,
306 with_row_access_policy,
307 with_tags,
308 iceberg,
309 external_volume,
310 base_location,
311 catalog,
312 catalog_sync,
313 storage_serialization_policy,
314 inherits,
315 table_options: CreateTableOptions::None,
316 dynamic,
317 version,
318 target_lag,
319 warehouse,
320 refresh_mode,
321 initialize,
322 require_user,
323 }) => {
324 if temporary {
325 return not_impl_err!("Temporary tables not supported")?;
326 }
327 if external {
328 return not_impl_err!("External tables not supported")?;
329 }
330 if global.is_some() {
331 return not_impl_err!("Global tables not supported")?;
332 }
333 if transient {
334 return not_impl_err!("Transient tables not supported")?;
335 }
336 if volatile {
337 return not_impl_err!("Volatile tables not supported")?;
338 }
339 if hive_distribution != ast::HiveDistributionStyle::NONE {
340 return not_impl_err!(
341 "Hive distribution not supported: {hive_distribution:?}"
342 )?;
343 }
344 if !matches!(
345 hive_formats,
346 Some(ast::HiveFormat {
347 row_format: None,
348 serde_properties: None,
349 storage: None,
350 location: None,
351 })
352 ) {
353 return not_impl_err!(
354 "Hive formats not supported: {hive_formats:?}"
355 )?;
356 }
357 if file_format.is_some() {
358 return not_impl_err!("File format not supported")?;
359 }
360 if location.is_some() {
361 return not_impl_err!("Location not supported")?;
362 }
363 if without_rowid {
364 return not_impl_err!("Without rowid not supported")?;
365 }
366 if like.is_some() {
367 return not_impl_err!("Like not supported")?;
368 }
369 if clone.is_some() {
370 return not_impl_err!("Clone not supported")?;
371 }
372 if comment.is_some() {
373 return not_impl_err!("Comment not supported")?;
374 }
375 if on_commit.is_some() {
376 return not_impl_err!("On commit not supported")?;
377 }
378 if on_cluster.is_some() {
379 return not_impl_err!("On cluster not supported")?;
380 }
381 if primary_key.is_some() {
382 return not_impl_err!("Primary key not supported")?;
383 }
384 if order_by.is_some() {
385 return not_impl_err!("Order by not supported")?;
386 }
387 if partition_by.is_some() {
388 return not_impl_err!("Partition by not supported")?;
389 }
390 if cluster_by.is_some() {
391 return not_impl_err!("Cluster by not supported")?;
392 }
393 if clustered_by.is_some() {
394 return not_impl_err!("Clustered by not supported")?;
395 }
396 if strict {
397 return not_impl_err!("Strict not supported")?;
398 }
399 if copy_grants {
400 return not_impl_err!("Copy grants not supported")?;
401 }
402 if enable_schema_evolution.is_some() {
403 return not_impl_err!("Enable schema evolution not supported")?;
404 }
405 if change_tracking.is_some() {
406 return not_impl_err!("Change tracking not supported")?;
407 }
408 if data_retention_time_in_days.is_some() {
409 return not_impl_err!("Data retention time in days not supported")?;
410 }
411 if max_data_extension_time_in_days.is_some() {
412 return not_impl_err!(
413 "Max data extension time in days not supported"
414 )?;
415 }
416 if default_ddl_collation.is_some() {
417 return not_impl_err!("Default DDL collation not supported")?;
418 }
419 if with_aggregation_policy.is_some() {
420 return not_impl_err!("With aggregation policy not supported")?;
421 }
422 if with_row_access_policy.is_some() {
423 return not_impl_err!("With row access policy not supported")?;
424 }
425 if with_tags.is_some() {
426 return not_impl_err!("With tags not supported")?;
427 }
428 if iceberg {
429 return not_impl_err!("Iceberg not supported")?;
430 }
431 if external_volume.is_some() {
432 return not_impl_err!("External volume not supported")?;
433 }
434 if base_location.is_some() {
435 return not_impl_err!("Base location not supported")?;
436 }
437 if catalog.is_some() {
438 return not_impl_err!("Catalog not supported")?;
439 }
440 if catalog_sync.is_some() {
441 return not_impl_err!("Catalog sync not supported")?;
442 }
443 if storage_serialization_policy.is_some() {
444 return not_impl_err!("Storage serialization policy not supported")?;
445 }
446 if inherits.is_some() {
447 return not_impl_err!("Table inheritance not supported")?;
448 }
449 if dynamic {
450 return not_impl_err!("Dynamic tables not supported")?;
451 }
452 if version.is_some() {
453 return not_impl_err!("Version not supported")?;
454 }
455 if target_lag.is_some() {
456 return not_impl_err!("Target lag not supported")?;
457 }
458 if warehouse.is_some() {
459 return not_impl_err!("Warehouse not supported")?;
460 }
461 if refresh_mode.is_some() {
462 return not_impl_err!("Refresh mode not supported")?;
463 }
464 if initialize.is_some() {
465 return not_impl_err!("Initialize not supported")?;
466 }
467 if require_user {
468 return not_impl_err!("Require user not supported")?;
469 }
470 let mut all_constraints = constraints;
472 let inline_constraints = calc_inline_constraints_from_columns(&columns);
473 all_constraints.extend(inline_constraints);
474 let column_defaults =
476 self.build_column_defaults(&columns, planner_context)?;
477
478 let has_columns = !columns.is_empty();
479 let schema = self.build_schema(columns)?.to_dfschema_ref()?;
480 if has_columns {
481 planner_context.set_table_schema(Some(Arc::clone(&schema)));
482 }
483
484 match query {
485 Some(query) => {
486 let plan = self.query_to_plan(*query, planner_context)?;
487 let input_schema = plan.schema();
488
489 let plan = if has_columns {
490 if schema.fields().len() != input_schema.fields().len() {
491 return plan_err!(
492 "Mismatch: {} columns specified, but result has {} columns",
493 schema.fields().len(),
494 input_schema.fields().len()
495 );
496 }
497 let input_fields = input_schema.fields();
498 let project_exprs = schema
499 .fields()
500 .iter()
501 .zip(input_fields)
502 .map(|(field, input_field)| {
503 cast(
504 col(input_field.name()),
505 field.data_type().clone(),
506 )
507 .alias(field.name())
508 })
509 .collect::<Vec<_>>();
510
511 LogicalPlanBuilder::from(plan.clone())
512 .project(project_exprs)?
513 .build()?
514 } else {
515 plan
516 };
517
518 let constraints = self.new_constraint_from_table_constraints(
519 &all_constraints,
520 plan.schema(),
521 )?;
522
523 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
524 CreateMemoryTable {
525 name: self.object_name_to_table_reference(name)?,
526 constraints,
527 input: Arc::new(plan),
528 if_not_exists,
529 or_replace,
530 column_defaults,
531 temporary,
532 },
533 )))
534 }
535
536 None => {
537 let plan = EmptyRelation {
538 produce_one_row: false,
539 schema,
540 };
541 let plan = LogicalPlan::EmptyRelation(plan);
542 let constraints = self.new_constraint_from_table_constraints(
543 &all_constraints,
544 plan.schema(),
545 )?;
546 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
547 CreateMemoryTable {
548 name: self.object_name_to_table_reference(name)?,
549 constraints,
550 input: Arc::new(plan),
551 if_not_exists,
552 or_replace,
553 column_defaults,
554 temporary,
555 },
556 )))
557 }
558 }
559 }
560 Statement::CreateView {
561 or_replace,
562 materialized,
563 name,
564 columns,
565 query,
566 options: CreateTableOptions::None,
567 cluster_by,
568 comment,
569 with_no_schema_binding,
570 if_not_exists,
571 temporary,
572 to,
573 params,
574 or_alter,
575 secure,
576 name_before_not_exists,
577 } => {
578 if materialized {
579 return not_impl_err!("Materialized views not supported")?;
580 }
581 if !cluster_by.is_empty() {
582 return not_impl_err!("Cluster by not supported")?;
583 }
584 if comment.is_some() {
585 return not_impl_err!("Comment not supported")?;
586 }
587 if with_no_schema_binding {
588 return not_impl_err!("With no schema binding not supported")?;
589 }
590 if if_not_exists {
591 return not_impl_err!("If not exists not supported")?;
592 }
593 if to.is_some() {
594 return not_impl_err!("To not supported")?;
595 }
596
597 let stmt = Statement::CreateView {
600 or_replace,
601 materialized,
602 name,
603 columns,
604 query,
605 options: CreateTableOptions::None,
606 cluster_by,
607 comment,
608 with_no_schema_binding,
609 if_not_exists,
610 temporary,
611 to,
612 params,
613 or_alter,
614 secure,
615 name_before_not_exists,
616 };
617 let sql = stmt.to_string();
618 let Statement::CreateView {
619 name,
620 columns,
621 query,
622 or_replace,
623 temporary,
624 ..
625 } = stmt
626 else {
627 return internal_err!("Unreachable code in create view");
628 };
629
630 let columns = columns
631 .into_iter()
632 .map(|view_column_def| {
633 if let Some(options) = view_column_def.options {
634 plan_err!(
635 "Options not supported for view columns: {options:?}"
636 )
637 } else {
638 Ok(view_column_def.name)
639 }
640 })
641 .collect::<Result<Vec<_>>>()?;
642
643 let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
644 plan = self.apply_expr_alias(plan, columns)?;
645
646 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
647 name: self.object_name_to_table_reference(name)?,
648 input: Arc::new(plan),
649 or_replace,
650 definition: Some(sql),
651 temporary,
652 })))
653 }
654 Statement::ShowCreate { obj_type, obj_name } => match obj_type {
655 ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
656 _ => {
657 not_impl_err!("Only `SHOW CREATE TABLE ...` statement is supported")
658 }
659 },
660 Statement::CreateSchema {
661 schema_name,
662 if_not_exists,
663 ..
664 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
665 CreateCatalogSchema {
666 schema_name: get_schema_name(&schema_name),
667 if_not_exists,
668 schema: Arc::new(DFSchema::empty()),
669 },
670 ))),
671 Statement::CreateDatabase {
672 db_name,
673 if_not_exists,
674 ..
675 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
676 CreateCatalog {
677 catalog_name: object_name_to_string(&db_name),
678 if_not_exists,
679 schema: Arc::new(DFSchema::empty()),
680 },
681 ))),
682 Statement::Drop {
683 object_type,
684 if_exists,
685 mut names,
686 cascade,
687 restrict: _,
688 purge: _,
689 temporary: _,
690 table: _,
691 } => {
692 let name = match names.len() {
695 0 => Err(ParserError("Missing table name.".to_string()).into()),
696 1 => self.object_name_to_table_reference(names.pop().unwrap()),
697 _ => {
698 Err(ParserError("Multiple objects not supported".to_string())
699 .into())
700 }
701 }?;
702
703 match object_type {
704 ObjectType::Table => {
705 Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
706 name,
707 if_exists,
708 schema: DFSchemaRef::new(DFSchema::empty()),
709 })))
710 }
711 ObjectType::View => {
712 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
713 name,
714 if_exists,
715 schema: DFSchemaRef::new(DFSchema::empty()),
716 })))
717 }
718 ObjectType::Schema => {
719 let name = match name {
720 TableReference::Bare { table } => {
721 Ok(SchemaReference::Bare { schema: table })
722 }
723 TableReference::Partial { schema, table } => {
724 Ok(SchemaReference::Full {
725 schema: table,
726 catalog: schema,
727 })
728 }
729 TableReference::Full {
730 catalog: _,
731 schema: _,
732 table: _,
733 } => Err(ParserError(
734 "Invalid schema specifier (has 3 parts)".to_string(),
735 )),
736 }?;
737 Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(
738 DropCatalogSchema {
739 name,
740 if_exists,
741 cascade,
742 schema: DFSchemaRef::new(DFSchema::empty()),
743 },
744 )))
745 }
746 _ => not_impl_err!(
747 "Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
748 ),
749 }
750 }
751 Statement::Prepare {
752 name,
753 data_types,
754 statement,
755 } => {
756 let mut fields: Vec<FieldRef> = data_types
758 .into_iter()
759 .map(|t| self.convert_data_type_to_field(&t))
760 .collect::<Result<_>>()?;
761
762 let mut planner_context =
764 PlannerContext::new().with_prepare_param_data_types(fields.clone());
765
766 let plan = self.sql_statement_to_plan_with_context_impl(
768 *statement,
769 &mut planner_context,
770 )?;
771
772 if fields.is_empty() {
773 let map_types = plan.get_parameter_fields()?;
774 let param_types: Vec<_> = (1..=map_types.len())
775 .filter_map(|i| {
776 let key = format!("${i}");
777 map_types.get(&key).and_then(|opt| opt.clone())
778 })
779 .collect();
780 fields.extend(param_types.iter().cloned());
781 planner_context.with_prepare_param_data_types(param_types);
782 }
783
784 Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
785 name: ident_to_string(&name),
786 fields,
787 input: Arc::new(plan),
788 })))
789 }
790 Statement::Execute {
791 name,
792 parameters,
793 using,
794 has_parentheses: _,
797 immediate,
798 into,
799 output,
800 default,
801 } => {
802 if !using.is_empty() {
804 return not_impl_err!(
805 "Execute statement with USING is not supported"
806 );
807 }
808 if immediate {
809 return not_impl_err!(
810 "Execute statement with IMMEDIATE is not supported"
811 );
812 }
813 if !into.is_empty() {
814 return not_impl_err!("Execute statement with INTO is not supported");
815 }
816 if output {
817 return not_impl_err!(
818 "Execute statement with OUTPUT is not supported"
819 );
820 }
821 if default {
822 return not_impl_err!(
823 "Execute statement with DEFAULT is not supported"
824 );
825 }
826 let empty_schema = DFSchema::empty();
827 let parameters = parameters
828 .into_iter()
829 .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
830 .collect::<Result<Vec<Expr>>>()?;
831
832 Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
833 name: object_name_to_string(&name.unwrap()),
834 parameters,
835 })))
836 }
837 Statement::Deallocate {
838 name,
839 prepare: _,
841 } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
842 Deallocate {
843 name: ident_to_string(&name),
844 },
845 ))),
846
847 Statement::ShowTables {
848 extended,
849 full,
850 terse,
851 history,
852 external,
853 show_options,
854 } => {
855 if extended {
858 return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
859 }
860 if full {
861 return not_impl_err!("SHOW FULL TABLES not supported")?;
862 }
863 if terse {
864 return not_impl_err!("SHOW TERSE TABLES not supported")?;
865 }
866 if history {
867 return not_impl_err!("SHOW TABLES HISTORY not supported")?;
868 }
869 if external {
870 return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
871 }
872 let ShowStatementOptions {
873 show_in,
874 starts_with,
875 limit,
876 limit_from,
877 filter_position,
878 } = show_options;
879 if show_in.is_some() {
880 return not_impl_err!("SHOW TABLES IN not supported")?;
881 }
882 if starts_with.is_some() {
883 return not_impl_err!("SHOW TABLES LIKE not supported")?;
884 }
885 if limit.is_some() {
886 return not_impl_err!("SHOW TABLES LIMIT not supported")?;
887 }
888 if limit_from.is_some() {
889 return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
890 }
891 if filter_position.is_some() {
892 return not_impl_err!("SHOW TABLES FILTER not supported")?;
893 }
894 self.show_tables_to_plan()
895 }
896
897 Statement::ShowColumns {
898 extended,
899 full,
900 show_options,
901 } => {
902 let ShowStatementOptions {
903 show_in,
904 starts_with,
905 limit,
906 limit_from,
907 filter_position,
908 } = show_options;
909 if starts_with.is_some() {
910 return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
911 }
912 if limit.is_some() {
913 return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
914 }
915 if limit_from.is_some() {
916 return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
917 }
918 if filter_position.is_some() {
919 return not_impl_err!(
920 "SHOW COLUMNS with WHERE or LIKE is not supported"
921 )?;
922 }
923 let Some(ShowStatementIn {
924 clause: _,
927 parent_type,
928 parent_name,
929 }) = show_in
930 else {
931 return plan_err!("SHOW COLUMNS requires a table name");
932 };
933
934 if let Some(parent_type) = parent_type {
935 return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
936 }
937 let Some(table_name) = parent_name else {
938 return plan_err!("SHOW COLUMNS requires a table name");
939 };
940
941 self.show_columns_to_plan(extended, full, table_name)
942 }
943
944 Statement::ShowFunctions { filter, .. } => {
945 self.show_functions_to_plan(filter)
946 }
947
948 Statement::Insert(Insert {
949 or,
950 into,
951 columns,
952 overwrite,
953 source,
954 partitioned,
955 after_columns,
956 table,
957 on,
958 returning,
959 ignore,
960 table_alias,
961 mut replace_into,
962 priority,
963 insert_alias,
964 assignments,
965 has_table_keyword,
966 settings,
967 format_clause,
968 }) => {
969 let table_name = match table {
970 TableObject::TableName(table_name) => table_name,
971 TableObject::TableFunction(_) => {
972 return not_impl_err!(
973 "INSERT INTO Table functions not supported"
974 );
975 }
976 };
977 if let Some(or) = or {
978 match or {
979 SqliteOnConflict::Replace => replace_into = true,
980 _ => plan_err!("Inserts with {or} clause is not supported")?,
981 }
982 }
983 if partitioned.is_some() {
984 plan_err!("Partitioned inserts not yet supported")?;
985 }
986 if !after_columns.is_empty() {
987 plan_err!("After-columns clause not supported")?;
988 }
989 if on.is_some() {
990 plan_err!("Insert-on clause not supported")?;
991 }
992 if returning.is_some() {
993 plan_err!("Insert-returning clause not supported")?;
994 }
995 if ignore {
996 plan_err!("Insert-ignore clause not supported")?;
997 }
998 let Some(source) = source else {
999 plan_err!("Inserts without a source not supported")?
1000 };
1001 if let Some(table_alias) = table_alias {
1002 plan_err!(
1003 "Inserts with a table alias not supported: {table_alias:?}"
1004 )?
1005 };
1006 if let Some(priority) = priority {
1007 plan_err!(
1008 "Inserts with a `PRIORITY` clause not supported: {priority:?}"
1009 )?
1010 };
1011 if insert_alias.is_some() {
1012 plan_err!("Inserts with an alias not supported")?;
1013 }
1014 if !assignments.is_empty() {
1015 plan_err!("Inserts with assignments not supported")?;
1016 }
1017 if settings.is_some() {
1018 plan_err!("Inserts with settings not supported")?;
1019 }
1020 if format_clause.is_some() {
1021 plan_err!("Inserts with format clause not supported")?;
1022 }
1023 let _ = into;
1025 let _ = has_table_keyword;
1026 self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
1027 }
1028 Statement::Update {
1029 table,
1030 assignments,
1031 from,
1032 selection,
1033 returning,
1034 or,
1035 limit,
1036 } => {
1037 let from_clauses =
1038 from.map(|update_table_from_kind| match update_table_from_kind {
1039 UpdateTableFromKind::BeforeSet(from_clauses) => from_clauses,
1040 UpdateTableFromKind::AfterSet(from_clauses) => from_clauses,
1041 });
1042 if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
1044 plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
1045 }
1046 let update_from = from_clauses.and_then(|mut f| f.pop());
1047 if returning.is_some() {
1048 plan_err!("Update-returning clause not yet supported")?;
1049 }
1050 if or.is_some() {
1051 plan_err!("ON conflict not supported")?;
1052 }
1053 if limit.is_some() {
1054 return not_impl_err!("Update-limit clause not supported")?;
1055 }
1056 self.update_to_plan(table, &assignments, update_from, selection)
1057 }
1058
1059 Statement::Delete(Delete {
1060 tables,
1061 using,
1062 selection,
1063 returning,
1064 from,
1065 order_by,
1066 limit,
1067 }) => {
1068 if !tables.is_empty() {
1069 plan_err!("DELETE <TABLE> not supported")?;
1070 }
1071
1072 if using.is_some() {
1073 plan_err!("Using clause not supported")?;
1074 }
1075
1076 if returning.is_some() {
1077 plan_err!("Delete-returning clause not yet supported")?;
1078 }
1079
1080 if !order_by.is_empty() {
1081 plan_err!("Delete-order-by clause not yet supported")?;
1082 }
1083
1084 if limit.is_some() {
1085 plan_err!("Delete-limit clause not yet supported")?;
1086 }
1087
1088 let table_name = self.get_delete_target(from)?;
1089 self.delete_to_plan(&table_name, selection)
1090 }
1091
1092 Statement::StartTransaction {
1093 modes,
1094 begin: false,
1095 modifier,
1096 transaction,
1097 statements,
1098 has_end_keyword,
1099 exception,
1100 } => {
1101 if let Some(modifier) = modifier {
1102 return not_impl_err!(
1103 "Transaction modifier not supported: {modifier}"
1104 );
1105 }
1106 if !statements.is_empty() {
1107 return not_impl_err!(
1108 "Transaction with multiple statements not supported"
1109 );
1110 }
1111 if exception.is_some() {
1112 return not_impl_err!(
1113 "Transaction with exception statements not supported"
1114 );
1115 }
1116 if has_end_keyword {
1117 return not_impl_err!("Transaction with END keyword not supported");
1118 }
1119 self.validate_transaction_kind(transaction.as_ref())?;
1120 let isolation_level: ast::TransactionIsolationLevel = modes
1121 .iter()
1122 .filter_map(|m: &TransactionMode| match m {
1123 TransactionMode::AccessMode(_) => None,
1124 TransactionMode::IsolationLevel(level) => Some(level),
1125 })
1126 .next_back()
1127 .copied()
1128 .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1129 let access_mode: ast::TransactionAccessMode = modes
1130 .iter()
1131 .filter_map(|m: &TransactionMode| match m {
1132 TransactionMode::AccessMode(mode) => Some(mode),
1133 TransactionMode::IsolationLevel(_) => None,
1134 })
1135 .next_back()
1136 .copied()
1137 .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1138 let isolation_level = match isolation_level {
1139 ast::TransactionIsolationLevel::ReadUncommitted => {
1140 TransactionIsolationLevel::ReadUncommitted
1141 }
1142 ast::TransactionIsolationLevel::ReadCommitted => {
1143 TransactionIsolationLevel::ReadCommitted
1144 }
1145 ast::TransactionIsolationLevel::RepeatableRead => {
1146 TransactionIsolationLevel::RepeatableRead
1147 }
1148 ast::TransactionIsolationLevel::Serializable => {
1149 TransactionIsolationLevel::Serializable
1150 }
1151 ast::TransactionIsolationLevel::Snapshot => {
1152 TransactionIsolationLevel::Snapshot
1153 }
1154 };
1155 let access_mode = match access_mode {
1156 ast::TransactionAccessMode::ReadOnly => {
1157 TransactionAccessMode::ReadOnly
1158 }
1159 ast::TransactionAccessMode::ReadWrite => {
1160 TransactionAccessMode::ReadWrite
1161 }
1162 };
1163 let statement = PlanStatement::TransactionStart(TransactionStart {
1164 access_mode,
1165 isolation_level,
1166 });
1167 Ok(LogicalPlan::Statement(statement))
1168 }
1169 Statement::Commit {
1170 chain,
1171 end,
1172 modifier,
1173 } => {
1174 if end {
1175 return not_impl_err!("COMMIT AND END not supported");
1176 };
1177 if let Some(modifier) = modifier {
1178 return not_impl_err!("COMMIT {modifier} not supported");
1179 };
1180 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1181 conclusion: TransactionConclusion::Commit,
1182 chain,
1183 });
1184 Ok(LogicalPlan::Statement(statement))
1185 }
1186 Statement::Rollback { chain, savepoint } => {
1187 if savepoint.is_some() {
1188 plan_err!("Savepoints not supported")?;
1189 }
1190 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1191 conclusion: TransactionConclusion::Rollback,
1192 chain,
1193 });
1194 Ok(LogicalPlan::Statement(statement))
1195 }
1196 Statement::CreateFunction(ast::CreateFunction {
1197 or_replace,
1198 temporary,
1199 name,
1200 args,
1201 return_type,
1202 function_body,
1203 behavior,
1204 language,
1205 ..
1206 }) => {
1207 let return_type = match return_type {
1208 Some(t) => Some(self.convert_data_type_to_field(&t)?),
1209 None => None,
1210 };
1211 let mut planner_context = PlannerContext::new();
1212 let empty_schema = &DFSchema::empty();
1213
1214 let args = match args {
1215 Some(function_args) => {
1216 let function_args = function_args
1217 .into_iter()
1218 .map(|arg| {
1219 let data_type =
1220 self.convert_data_type_to_field(&arg.data_type)?;
1221
1222 let default_expr = match arg.default_expr {
1223 Some(expr) => Some(self.sql_to_expr(
1224 expr,
1225 empty_schema,
1226 &mut planner_context,
1227 )?),
1228 None => None,
1229 };
1230 Ok(OperateFunctionArg {
1231 name: arg.name,
1232 default_expr,
1233 data_type: data_type.data_type().clone(),
1234 })
1235 })
1236 .collect::<Result<Vec<OperateFunctionArg>>>();
1237 Some(function_args?)
1238 }
1239 None => None,
1240 };
1241 let first_default = match args.as_ref() {
1243 Some(arg) => arg.iter().position(|t| t.default_expr.is_some()),
1244 None => None,
1245 };
1246 let last_non_default = match args.as_ref() {
1247 Some(arg) => arg
1248 .iter()
1249 .rev()
1250 .position(|t| t.default_expr.is_none())
1251 .map(|reverse_pos| arg.len() - reverse_pos - 1),
1252 None => None,
1253 };
1254 if let (Some(pos_default), Some(pos_non_default)) =
1255 (first_default, last_non_default)
1256 && pos_non_default > pos_default
1257 {
1258 return plan_err!(
1259 "Non-default arguments cannot follow default arguments."
1260 );
1261 }
1262 let name = match &name.0[..] {
1264 [] => exec_err!("Function should have name")?,
1265 [n] => n.as_ident().unwrap().value.clone(),
1266 [..] => not_impl_err!("Qualified functions are not supported")?,
1267 };
1268 let arg_types = args.as_ref().map(|arg| {
1272 arg.iter()
1273 .map(|t| {
1274 let name = match t.name.clone() {
1275 Some(name) => name.value,
1276 None => "".to_string(),
1277 };
1278 Arc::new(Field::new(name, t.data_type.clone(), true))
1279 })
1280 .collect::<Vec<_>>()
1281 });
1282 if let Some(ref fields) = arg_types {
1284 let count_positional =
1285 fields.iter().filter(|f| f.name() == "").count();
1286 if !(count_positional == 0 || count_positional == fields.len()) {
1287 return plan_err!(
1288 "All function arguments must use either named or positional style."
1289 );
1290 }
1291 }
1292 let mut planner_context = PlannerContext::new()
1293 .with_prepare_param_data_types(arg_types.unwrap_or_default());
1294
1295 let function_body = match function_body {
1296 Some(r) => Some(self.sql_to_expr(
1297 match r {
1298 ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1299 ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1300 ast::CreateFunctionBody::Return(expr) => expr,
1301 ast::CreateFunctionBody::AsBeginEnd(_) => {
1302 return not_impl_err!(
1303 "BEGIN/END enclosed function body syntax is not supported"
1304 )?;
1305 }
1306 ast::CreateFunctionBody::AsReturnExpr(_)
1307 | ast::CreateFunctionBody::AsReturnSelect(_) => {
1308 return not_impl_err!(
1309 "AS RETURN function syntax is not supported"
1310 )?
1311 }
1312 },
1313 &DFSchema::empty(),
1314 &mut planner_context,
1315 )?),
1316 None => None,
1317 };
1318
1319 let params = CreateFunctionBody {
1320 language,
1321 behavior: behavior.map(|b| match b {
1322 ast::FunctionBehavior::Immutable => Volatility::Immutable,
1323 ast::FunctionBehavior::Stable => Volatility::Stable,
1324 ast::FunctionBehavior::Volatile => Volatility::Volatile,
1325 }),
1326 function_body,
1327 };
1328
1329 let statement = DdlStatement::CreateFunction(CreateFunction {
1330 or_replace,
1331 temporary,
1332 name,
1333 return_type: return_type.map(|f| f.data_type().clone()),
1334 args,
1335 params,
1336 schema: DFSchemaRef::new(DFSchema::empty()),
1337 });
1338
1339 Ok(LogicalPlan::Ddl(statement))
1340 }
1341 Statement::DropFunction {
1342 if_exists,
1343 func_desc,
1344 ..
1345 } => {
1346 if let Some(desc) = func_desc.first() {
1349 let name = match &desc.name.0[..] {
1351 [] => exec_err!("Function should have name")?,
1352 [n] => n.as_ident().unwrap().value.clone(),
1353 [..] => not_impl_err!("Qualified functions are not supported")?,
1354 };
1355 let statement = DdlStatement::DropFunction(DropFunction {
1356 if_exists,
1357 name,
1358 schema: DFSchemaRef::new(DFSchema::empty()),
1359 });
1360 Ok(LogicalPlan::Ddl(statement))
1361 } else {
1362 exec_err!("Function name not provided")
1363 }
1364 }
1365 Statement::CreateIndex(CreateIndex {
1366 name,
1367 table_name,
1368 using,
1369 columns,
1370 unique,
1371 if_not_exists,
1372 ..
1373 }) => {
1374 let name: Option<String> = name.as_ref().map(object_name_to_string);
1375 let table = self.object_name_to_table_reference(table_name)?;
1376 let table_schema = self
1377 .context_provider
1378 .get_table_source(table.clone())?
1379 .schema()
1380 .to_dfschema_ref()?;
1381 let using: Option<String> =
1382 using.as_ref().map(|index_type| match index_type {
1383 IndexType::Custom(ident) => ident_to_string(ident),
1384 _ => index_type.to_string().to_ascii_lowercase(),
1385 });
1386 let order_by_exprs: Vec<OrderByExpr> =
1387 columns.into_iter().map(|col| col.column).collect();
1388 let columns = self.order_by_to_sort_expr(
1389 order_by_exprs,
1390 &table_schema,
1391 planner_context,
1392 false,
1393 None,
1394 )?;
1395 Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1396 PlanCreateIndex {
1397 name,
1398 table,
1399 using,
1400 columns,
1401 unique,
1402 if_not_exists,
1403 schema: DFSchemaRef::new(DFSchema::empty()),
1404 },
1405 )))
1406 }
1407 stmt => {
1408 not_impl_err!("Unsupported SQL statement: {stmt}")
1409 }
1410 }
1411 }
1412
1413 fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1414 let mut from = match from {
1415 FromTable::WithFromKeyword(v) => v,
1416 FromTable::WithoutKeyword(v) => v,
1417 };
1418
1419 if from.len() != 1 {
1420 return not_impl_err!(
1421 "DELETE FROM only supports single table, got {}: {from:?}",
1422 from.len()
1423 );
1424 }
1425 let table_factor = from.pop().unwrap();
1426 if !table_factor.joins.is_empty() {
1427 return not_impl_err!("DELETE FROM only supports single table, got: joins");
1428 }
1429 let TableFactor::Table { name, .. } = table_factor.relation else {
1430 return not_impl_err!(
1431 "DELETE FROM only supports single table, got: {table_factor:?}"
1432 );
1433 };
1434
1435 Ok(name)
1436 }
1437
1438 fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1440 if self.has_table("information_schema", "tables") {
1441 let query = "SELECT * FROM information_schema.tables;";
1442 let mut rewrite = DFParser::parse_sql(query)?;
1443 assert_eq!(rewrite.len(), 1);
1444 self.statement_to_plan(rewrite.pop_front().unwrap()) } else {
1446 plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1447 }
1448 }
1449
1450 fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1451 let table_ref = self.object_name_to_table_reference(table_name)?;
1452
1453 let table_source = self.context_provider.get_table_source(table_ref)?;
1454
1455 let schema = table_source.schema();
1456
1457 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1458
1459 Ok(LogicalPlan::DescribeTable(DescribeTable {
1460 schema,
1461 output_schema: Arc::new(output_schema),
1462 }))
1463 }
1464
1465 fn describe_query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
1466 let plan = self.query_to_plan(query, &mut PlannerContext::new())?;
1467
1468 let schema = Arc::new(plan.schema().as_arrow().clone());
1469
1470 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1471
1472 Ok(LogicalPlan::DescribeTable(DescribeTable {
1473 schema,
1474 output_schema: Arc::new(output_schema),
1475 }))
1476 }
1477
1478 fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1479 let copy_source = statement.source;
1481 let (input, input_schema, table_ref) = match copy_source {
1482 CopyToSource::Relation(object_name) => {
1483 let table_name = object_name_to_string(&object_name);
1484 let table_ref = self.object_name_to_table_reference(object_name)?;
1485 let table_source =
1486 self.context_provider.get_table_source(table_ref.clone())?;
1487 let plan =
1488 LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1489 let input_schema = Arc::clone(plan.schema());
1490 (plan, input_schema, Some(table_ref))
1491 }
1492 CopyToSource::Query(query) => {
1493 let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1494 let input_schema = Arc::clone(plan.schema());
1495 (plan, input_schema, None)
1496 }
1497 };
1498
1499 let options_map = self.parse_options_map(statement.options, true)?;
1500
1501 let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1502 self.context_provider.get_file_type(stored_as).ok()
1503 } else {
1504 None
1505 };
1506
1507 let file_type = match maybe_file_type {
1508 Some(ft) => ft,
1509 None => {
1510 let e = || {
1511 DataFusionError::Configuration(
1512 "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1513 .to_string(),
1514 )
1515 };
1516 let extension: &str = &Path::new(&statement.target)
1518 .extension()
1519 .ok_or_else(e)?
1520 .to_str()
1521 .ok_or_else(e)?
1522 .to_lowercase();
1523
1524 self.context_provider.get_file_type(extension)?
1525 }
1526 };
1527
1528 let partition_by = statement
1529 .partitioned_by
1530 .iter()
1531 .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1532 .collect::<Result<Vec<_>>>()?
1533 .into_iter()
1534 .map(|f| f.name().to_owned())
1535 .collect();
1536
1537 Ok(LogicalPlan::Copy(CopyTo::new(
1538 Arc::new(input),
1539 statement.target,
1540 partition_by,
1541 file_type,
1542 options_map,
1543 )))
1544 }
1545
1546 fn build_order_by(
1547 &self,
1548 order_exprs: Vec<LexOrdering>,
1549 schema: &DFSchemaRef,
1550 planner_context: &mut PlannerContext,
1551 ) -> Result<Vec<Vec<SortExpr>>> {
1552 if !order_exprs.is_empty() && schema.fields().is_empty() {
1553 let results = order_exprs
1554 .iter()
1555 .map(|lex_order| {
1556 let result = lex_order
1557 .iter()
1558 .map(|order_by_expr| {
1559 let ordered_expr = &order_by_expr.expr;
1560 let ordered_expr = ordered_expr.to_owned();
1561 let ordered_expr = self.sql_expr_to_logical_expr(
1562 ordered_expr,
1563 schema,
1564 planner_context,
1565 )?;
1566 let asc = order_by_expr.options.asc.unwrap_or(true);
1567 let nulls_first =
1568 order_by_expr.options.nulls_first.unwrap_or_else(|| {
1569 self.options.default_null_ordering.nulls_first(asc)
1570 });
1571
1572 Ok(SortExpr::new(ordered_expr, asc, nulls_first))
1573 })
1574 .collect::<Result<Vec<SortExpr>>>()?;
1575 Ok(result)
1576 })
1577 .collect::<Result<Vec<Vec<SortExpr>>>>()?;
1578
1579 return Ok(results);
1580 }
1581
1582 let mut all_results = vec![];
1583 for expr in order_exprs {
1584 let expr_vec =
1586 self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1587 for sort in expr_vec.iter() {
1589 for column in sort.expr.column_refs().iter() {
1590 if !schema.has_column(column) {
1591 return plan_err!("Column {column} is not in schema");
1593 }
1594 }
1595 }
1596 all_results.push(expr_vec)
1598 }
1599 Ok(all_results)
1600 }
1601
1602 fn external_table_to_plan(
1604 &self,
1605 statement: CreateExternalTable,
1606 ) -> Result<LogicalPlan> {
1607 let definition = Some(statement.to_string());
1608 let CreateExternalTable {
1609 name,
1610 columns,
1611 file_type,
1612 location,
1613 table_partition_cols,
1614 if_not_exists,
1615 temporary,
1616 order_exprs,
1617 unbounded,
1618 options,
1619 constraints,
1620 or_replace,
1621 } = statement;
1622
1623 let mut all_constraints = constraints;
1625 let inline_constraints = calc_inline_constraints_from_columns(&columns);
1626 all_constraints.extend(inline_constraints);
1627
1628 let options_map = self.parse_options_map(options, false)?;
1629
1630 let compression = options_map
1631 .get("format.compression")
1632 .map(|c| CompressionTypeVariant::from_str(c))
1633 .transpose()?;
1634 if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1635 && compression
1636 .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1637 .unwrap_or(false)
1638 {
1639 plan_err!(
1640 "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1641 )?;
1642 }
1643
1644 let mut planner_context = PlannerContext::new();
1645
1646 let column_defaults = self
1647 .build_column_defaults(&columns, &mut planner_context)?
1648 .into_iter()
1649 .collect();
1650
1651 let schema = self.build_schema(columns)?;
1652 let df_schema = schema.to_dfschema_ref()?;
1653 df_schema.check_names()?;
1654
1655 let ordered_exprs =
1656 self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1657
1658 let name = self.object_name_to_table_reference(name)?;
1659 let constraints =
1660 self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1661 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1662 PlanCreateExternalTable::builder(name, location, file_type, df_schema)
1663 .with_partition_cols(table_partition_cols)
1664 .with_if_not_exists(if_not_exists)
1665 .with_or_replace(or_replace)
1666 .with_temporary(temporary)
1667 .with_definition(definition)
1668 .with_order_exprs(ordered_exprs)
1669 .with_unbounded(unbounded)
1670 .with_options(options_map)
1671 .with_constraints(constraints)
1672 .with_column_defaults(column_defaults)
1673 .build(),
1674 )))
1675 }
1676
1677 fn get_constraint_column_indices(
1680 &self,
1681 df_schema: &DFSchemaRef,
1682 columns: &[IndexColumn],
1683 constraint_name: &str,
1684 ) -> Result<Vec<usize>> {
1685 let field_names = df_schema.field_names();
1686 columns
1687 .iter()
1688 .map(|index_column| {
1689 let expr = &index_column.column.expr;
1690 let ident = if let SQLExpr::Identifier(ident) = expr {
1691 ident
1692 } else {
1693 return Err(plan_datafusion_err!(
1694 "Column name for {constraint_name} must be an identifier: {expr}"
1695 ));
1696 };
1697 let column = self.ident_normalizer.normalize(ident.clone());
1698 field_names
1699 .iter()
1700 .position(|item| *item == column)
1701 .ok_or_else(|| {
1702 plan_datafusion_err!(
1703 "Column for {constraint_name} not found in schema: {column}"
1704 )
1705 })
1706 })
1707 .collect::<Result<Vec<_>>>()
1708 }
1709
1710 pub fn new_constraint_from_table_constraints(
1712 &self,
1713 constraints: &[TableConstraint],
1714 df_schema: &DFSchemaRef,
1715 ) -> Result<Constraints> {
1716 let constraints = constraints
1717 .iter()
1718 .map(|c: &TableConstraint| match c {
1719 TableConstraint::Unique { name, columns, .. } => {
1720 let constraint_name = match name {
1721 Some(name) => &format!("unique constraint with name '{name}'"),
1722 None => "unique constraint",
1723 };
1724 let indices = self.get_constraint_column_indices(
1726 df_schema,
1727 columns,
1728 constraint_name,
1729 )?;
1730 Ok(Constraint::Unique(indices))
1731 }
1732 TableConstraint::PrimaryKey { columns, .. } => {
1733 let indices = self.get_constraint_column_indices(
1735 df_schema,
1736 columns,
1737 "primary key",
1738 )?;
1739 Ok(Constraint::PrimaryKey(indices))
1740 }
1741 TableConstraint::ForeignKey { .. } => {
1742 _plan_err!("Foreign key constraints are not currently supported")
1743 }
1744 TableConstraint::Check { .. } => {
1745 _plan_err!("Check constraints are not currently supported")
1746 }
1747 TableConstraint::Index { .. } => {
1748 _plan_err!("Indexes are not currently supported")
1749 }
1750 TableConstraint::FulltextOrSpatial { .. } => {
1751 _plan_err!("Indexes are not currently supported")
1752 }
1753 })
1754 .collect::<Result<Vec<_>>>()?;
1755 Ok(Constraints::new_unverified(constraints))
1756 }
1757
1758 fn parse_options_map(
1759 &self,
1760 options: Vec<(String, Value)>,
1761 allow_duplicates: bool,
1762 ) -> Result<HashMap<String, String>> {
1763 let mut options_map = HashMap::new();
1764 for (key, value) in options {
1765 if !allow_duplicates && options_map.contains_key(&key) {
1766 return plan_err!("Option {key} is specified multiple times");
1767 }
1768
1769 let Some(value_string) = crate::utils::value_to_string(&value) else {
1770 return plan_err!("Unsupported Value {}", value);
1771 };
1772
1773 if !(&key.contains('.')) {
1774 let renamed_key = format!("format.{key}");
1778 options_map.insert(renamed_key.to_lowercase(), value_string);
1779 } else {
1780 options_map.insert(key.to_lowercase(), value_string);
1781 }
1782 }
1783
1784 Ok(options_map)
1785 }
1786
1787 fn explain_to_plan(
1792 &self,
1793 verbose: bool,
1794 analyze: bool,
1795 format: Option<String>,
1796 statement: DFStatement,
1797 ) -> Result<LogicalPlan> {
1798 let plan = self.statement_to_plan(statement)?;
1799 if matches!(plan, LogicalPlan::Explain(_)) {
1800 return plan_err!("Nested EXPLAINs are not supported");
1801 }
1802
1803 let plan = Arc::new(plan);
1804 let schema = LogicalPlan::explain_schema();
1805 let schema = schema.to_dfschema_ref()?;
1806
1807 if verbose && format.is_some() {
1808 return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1809 }
1810
1811 if analyze {
1812 if format.is_some() {
1813 return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1814 }
1815 Ok(LogicalPlan::Analyze(Analyze {
1816 verbose,
1817 input: plan,
1818 schema,
1819 }))
1820 } else {
1821 let stringified_plans =
1822 vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1823
1824 let options = self.context_provider.options();
1827 let format = if verbose {
1828 ExplainFormat::Indent
1829 } else if let Some(format) = format {
1830 ExplainFormat::from_str(&format)?
1831 } else {
1832 options.explain.format.clone()
1833 };
1834
1835 Ok(LogicalPlan::Explain(Explain {
1836 verbose,
1837 explain_format: format,
1838 plan,
1839 stringified_plans,
1840 schema,
1841 logical_optimization_succeeded: false,
1842 }))
1843 }
1844 }
1845
1846 fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1847 if !self.has_table("information_schema", "df_settings") {
1848 return plan_err!(
1849 "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1850 );
1851 }
1852
1853 let verbose = variable
1854 .last()
1855 .map(|s| ident_to_string(s) == "verbose")
1856 .unwrap_or(false);
1857 let mut variable_vec = variable.to_vec();
1858 let mut columns: String = "name, value".to_owned();
1859
1860 if verbose {
1861 columns = format!("{columns}, description");
1862 variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1863 }
1864
1865 let variable = object_name_to_string(&ObjectName::from(variable_vec));
1866 let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1867 let query = if variable == "all" {
1868 format!("{base_query} ORDER BY name")
1870 } else if variable == "timezone" || variable == "time.zone" {
1871 format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1873 } else {
1874 let is_valid_variable = self
1878 .context_provider
1879 .options()
1880 .entries()
1881 .iter()
1882 .any(|opt| opt.key == variable);
1883
1884 let is_runtime_variable = variable.starts_with("datafusion.runtime.");
1886
1887 if !is_valid_variable && !is_runtime_variable {
1888 return plan_err!(
1889 "'{variable}' is not a variable which can be viewed with 'SHOW'"
1890 );
1891 }
1892
1893 format!("{base_query} WHERE name = '{variable}'")
1894 };
1895
1896 let mut rewrite = DFParser::parse_sql(&query)?;
1897 assert_eq!(rewrite.len(), 1);
1898
1899 self.statement_to_plan(rewrite.pop_front().unwrap())
1900 }
1901
1902 fn set_statement_to_plan(&self, statement: Set) -> Result<LogicalPlan> {
1903 match statement {
1904 Set::SingleAssignment {
1905 scope,
1906 hivevar,
1907 variable,
1908 values,
1909 } => {
1910 if scope.is_some() {
1911 return not_impl_err!("SET with scope modifiers is not supported");
1912 }
1913
1914 if hivevar {
1915 return not_impl_err!("SET HIVEVAR is not supported");
1916 }
1917
1918 let variable = object_name_to_string(&variable);
1919 let mut variable_lower = variable.to_lowercase();
1920
1921 if variable_lower == "timezone" || variable_lower == "time.zone" {
1923 variable_lower = "datafusion.execution.time_zone".to_string();
1924 }
1925
1926 if values.len() != 1 {
1927 return plan_err!("SET only supports single value assignment");
1928 }
1929
1930 let value_string = match &values[0] {
1931 SQLExpr::Identifier(i) => ident_to_string(i),
1932 SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
1933 None => {
1934 return plan_err!("Unsupported value {:?}", v.value);
1935 }
1936 Some(s) => s,
1937 },
1938 SQLExpr::UnaryOp { op, expr } => match op {
1939 UnaryOperator::Plus => format!("+{expr}"),
1940 UnaryOperator::Minus => format!("-{expr}"),
1941 _ => return plan_err!("Unsupported unary op {:?}", op),
1942 },
1943 _ => return plan_err!("Unsupported expr {:?}", values[0]),
1944 };
1945
1946 Ok(LogicalPlan::Statement(PlanStatement::SetVariable(
1947 SetVariable {
1948 variable: variable_lower,
1949 value: value_string,
1950 },
1951 )))
1952 }
1953 other => not_impl_err!("SET variant not implemented yet: {other:?}"),
1954 }
1955 }
1956
1957 fn reset_statement_to_plan(&self, statement: ResetStatement) -> Result<LogicalPlan> {
1958 match statement {
1959 ResetStatement::Variable(variable) => {
1960 let variable = object_name_to_string(&variable);
1961 let mut variable_lower = variable.to_lowercase();
1962
1963 if variable_lower == "timezone" || variable_lower == "time.zone" {
1965 variable_lower = "datafusion.execution.time_zone".to_string();
1966 }
1967
1968 Ok(LogicalPlan::Statement(PlanStatement::ResetVariable(
1969 ResetVariable {
1970 variable: variable_lower,
1971 },
1972 )))
1973 }
1974 }
1975 }
1976
1977 fn delete_to_plan(
1978 &self,
1979 table_name: &ObjectName,
1980 predicate_expr: Option<SQLExpr>,
1981 ) -> Result<LogicalPlan> {
1982 let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1984 let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1985 let schema = DFSchema::try_from_qualified_schema(
1986 table_ref.clone(),
1987 &table_source.schema(),
1988 )?;
1989 let scan =
1990 LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1991 .build()?;
1992 let mut planner_context = PlannerContext::new();
1993
1994 let source = match predicate_expr {
1995 None => scan,
1996 Some(predicate_expr) => {
1997 let filter_expr =
1998 self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1999 let schema = Arc::new(schema);
2000 let mut using_columns = HashSet::new();
2001 expr_to_columns(&filter_expr, &mut using_columns)?;
2002 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2003 filter_expr,
2004 &[&[&schema]],
2005 &[using_columns],
2006 )?;
2007 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2008 }
2009 };
2010
2011 let plan = LogicalPlan::Dml(DmlStatement::new(
2012 table_ref,
2013 table_source,
2014 WriteOp::Delete,
2015 Arc::new(source),
2016 ));
2017 Ok(plan)
2018 }
2019
2020 fn update_to_plan(
2021 &self,
2022 table: TableWithJoins,
2023 assignments: &[Assignment],
2024 from: Option<TableWithJoins>,
2025 predicate_expr: Option<SQLExpr>,
2026 ) -> Result<LogicalPlan> {
2027 let (table_name, table_alias) = match &table.relation {
2028 TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
2029 _ => plan_err!("Cannot update non-table relation!")?,
2030 };
2031
2032 let table_name = self.object_name_to_table_reference(table_name)?;
2034 let table_source = self.context_provider.get_table_source(table_name.clone())?;
2035 let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
2036 table_name.clone(),
2037 &table_source.schema(),
2038 )?);
2039
2040 let mut planner_context = PlannerContext::new();
2042 let mut assign_map = assignments
2043 .iter()
2044 .map(|assign| {
2045 let cols = match &assign.target {
2046 AssignmentTarget::ColumnName(cols) => cols,
2047 _ => plan_err!("Tuples are not supported")?,
2048 };
2049 let col_name: &Ident = cols
2050 .0
2051 .iter()
2052 .last()
2053 .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
2054 .as_ident()
2055 .unwrap();
2056 table_schema.field_with_unqualified_name(&col_name.value)?;
2058 Ok((col_name.value.clone(), assign.value.clone()))
2059 })
2060 .collect::<Result<HashMap<String, SQLExpr>>>()?;
2061
2062 let mut input_tables = vec![table];
2064 input_tables.extend(from);
2065 let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
2066
2067 let source = match predicate_expr {
2069 None => scan,
2070 Some(predicate_expr) => {
2071 let filter_expr = self.sql_to_expr(
2072 predicate_expr,
2073 scan.schema(),
2074 &mut planner_context,
2075 )?;
2076 let mut using_columns = HashSet::new();
2077 expr_to_columns(&filter_expr, &mut using_columns)?;
2078 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2079 filter_expr,
2080 &[&[scan.schema()]],
2081 &[using_columns],
2082 )?;
2083 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2084 }
2085 };
2086
2087 let exprs = table_schema
2089 .iter()
2090 .map(|(qualifier, field)| {
2091 let expr = match assign_map.remove(field.name()) {
2092 Some(new_value) => {
2093 let mut expr = self.sql_to_expr(
2094 new_value,
2095 source.schema(),
2096 &mut planner_context,
2097 )?;
2098 if let Expr::Placeholder(placeholder) = &mut expr {
2100 placeholder.field = placeholder
2101 .field
2102 .take()
2103 .or_else(|| Some(Arc::clone(field)));
2104 }
2105 expr.cast_to(field.data_type(), source.schema())?
2107 }
2108 None => {
2109 if let Some(alias) = &table_alias {
2111 Expr::Column(Column::new(
2112 Some(self.ident_normalizer.normalize(alias.name.clone())),
2113 field.name(),
2114 ))
2115 } else {
2116 Expr::Column(Column::from((qualifier, field)))
2117 }
2118 }
2119 };
2120 Ok(expr.alias(field.name()))
2121 })
2122 .collect::<Result<Vec<_>>>()?;
2123
2124 let source = project(source, exprs)?;
2125
2126 let plan = LogicalPlan::Dml(DmlStatement::new(
2127 table_name,
2128 table_source,
2129 WriteOp::Update,
2130 Arc::new(source),
2131 ));
2132 Ok(plan)
2133 }
2134
2135 fn insert_to_plan(
2136 &self,
2137 table_name: ObjectName,
2138 columns: Vec<Ident>,
2139 source: Box<Query>,
2140 overwrite: bool,
2141 replace_into: bool,
2142 ) -> Result<LogicalPlan> {
2143 let table_name = self.object_name_to_table_reference(table_name)?;
2145 let table_source = self.context_provider.get_table_source(table_name.clone())?;
2146 let table_schema = DFSchema::try_from(table_source.schema())?;
2147
2148 let (fields, value_indices) = if columns.is_empty() {
2156 (
2158 table_schema.fields().clone(),
2159 (0..table_schema.fields().len())
2160 .map(Some)
2161 .collect::<Vec<_>>(),
2162 )
2163 } else {
2164 let mut value_indices = vec![None; table_schema.fields().len()];
2165 let fields = columns
2166 .into_iter()
2167 .enumerate()
2168 .map(|(i, c)| {
2169 let c = self.ident_normalizer.normalize(c);
2170 let column_index = table_schema
2171 .index_of_column_by_name(None, &c)
2172 .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
2173
2174 if value_indices[column_index].is_some() {
2175 return schema_err!(SchemaError::DuplicateUnqualifiedField {
2176 name: c,
2177 });
2178 } else {
2179 value_indices[column_index] = Some(i);
2180 }
2181 Ok(Arc::clone(table_schema.field(column_index)))
2182 })
2183 .collect::<Result<Vec<_>>>()?;
2184 (Fields::from(fields), value_indices)
2185 };
2186
2187 let mut prepare_param_data_types = BTreeMap::new();
2189 if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2190 for row in rows.iter() {
2191 for (idx, val) in row.iter().enumerate() {
2192 if let SQLExpr::Value(ValueWithSpan {
2193 value: Value::Placeholder(name),
2194 span: _,
2195 }) = val
2196 {
2197 let name =
2198 name.replace('$', "").parse::<usize>().map_err(|_| {
2199 plan_datafusion_err!("Can't parse placeholder: {name}")
2200 })? - 1;
2201 let field = fields.get(idx).ok_or_else(|| {
2202 plan_datafusion_err!(
2203 "Placeholder ${} refers to a non existent column",
2204 idx + 1
2205 )
2206 })?;
2207 let _ = prepare_param_data_types.insert(name, Arc::clone(field));
2208 }
2209 }
2210 }
2211 }
2212 let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2213
2214 let mut planner_context =
2216 PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2217 planner_context.set_table_schema(Some(DFSchemaRef::new(
2218 DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2219 )));
2220 let source = self.query_to_plan(*source, &mut planner_context)?;
2221 if fields.len() != source.schema().fields().len() {
2222 plan_err!("Column count doesn't match insert query!")?;
2223 }
2224
2225 let exprs = value_indices
2226 .into_iter()
2227 .enumerate()
2228 .map(|(i, value_index)| {
2229 let target_field = table_schema.field(i);
2230 let expr = match value_index {
2231 Some(v) => {
2232 Expr::Column(Column::from(source.schema().qualified_field(v)))
2233 .cast_to(target_field.data_type(), source.schema())?
2234 }
2235 None => table_source
2237 .get_column_default(target_field.name())
2238 .cloned()
2239 .unwrap_or_else(|| {
2240 Expr::Literal(ScalarValue::Null, None)
2242 })
2243 .cast_to(target_field.data_type(), &DFSchema::empty())?,
2244 };
2245 Ok(expr.alias(target_field.name()))
2246 })
2247 .collect::<Result<Vec<Expr>>>()?;
2248 let source = project(source, exprs)?;
2249
2250 let insert_op = match (overwrite, replace_into) {
2251 (false, false) => InsertOp::Append,
2252 (true, false) => InsertOp::Overwrite,
2253 (false, true) => InsertOp::Replace,
2254 (true, true) => plan_err!(
2255 "Conflicting insert operations: `overwrite` and `replace_into` cannot both be true"
2256 )?,
2257 };
2258
2259 let plan = LogicalPlan::Dml(DmlStatement::new(
2260 table_name,
2261 Arc::clone(&table_source),
2262 WriteOp::Insert(insert_op),
2263 Arc::new(source),
2264 ));
2265 Ok(plan)
2266 }
2267
2268 fn show_columns_to_plan(
2269 &self,
2270 extended: bool,
2271 full: bool,
2272 sql_table_name: ObjectName,
2273 ) -> Result<LogicalPlan> {
2274 let where_clause = object_name_to_qualifier(
2276 &sql_table_name,
2277 self.options.enable_ident_normalization,
2278 )?;
2279
2280 if !self.has_table("information_schema", "columns") {
2281 return plan_err!(
2282 "SHOW COLUMNS is not supported unless information_schema is enabled"
2283 );
2284 }
2285
2286 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2288 let _ = self.context_provider.get_table_source(table_ref)?;
2289
2290 let select_list = if full || extended {
2292 "*"
2293 } else {
2294 "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2295 };
2296
2297 let query = format!(
2298 "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2299 );
2300
2301 let mut rewrite = DFParser::parse_sql(&query)?;
2302 assert_eq!(rewrite.len(), 1);
2303 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2305
2306 fn show_functions_to_plan(
2317 &self,
2318 filter: Option<ShowStatementFilter>,
2319 ) -> Result<LogicalPlan> {
2320 let where_clause = if let Some(filter) = filter {
2321 match filter {
2322 ShowStatementFilter::Like(like) => {
2323 format!("WHERE p.function_name like '{like}'")
2324 }
2325 _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2326 }
2327 } else {
2328 "".to_string()
2329 };
2330
2331 let query = format!(
2332 r#"
2333SELECT DISTINCT
2334 p.*,
2335 r.function_type function_type,
2336 r.description description,
2337 r.syntax_example syntax_example
2338FROM
2339 (
2340 SELECT
2341 i.specific_name function_name,
2342 o.data_type return_type,
2343 array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2344 array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2345 FROM (
2346 SELECT
2347 specific_catalog,
2348 specific_schema,
2349 specific_name,
2350 ordinal_position,
2351 parameter_name,
2352 data_type,
2353 rid
2354 FROM
2355 information_schema.parameters
2356 WHERE
2357 parameter_mode = 'IN'
2358 ) i
2359 JOIN
2360 (
2361 SELECT
2362 specific_catalog,
2363 specific_schema,
2364 specific_name,
2365 ordinal_position,
2366 parameter_name,
2367 data_type,
2368 rid
2369 FROM
2370 information_schema.parameters
2371 WHERE
2372 parameter_mode = 'OUT'
2373 ) o
2374 ON i.specific_catalog = o.specific_catalog
2375 AND i.specific_schema = o.specific_schema
2376 AND i.specific_name = o.specific_name
2377 AND i.rid = o.rid
2378 GROUP BY 1, 2, i.rid
2379 ) as p
2380JOIN information_schema.routines r
2381ON p.function_name = r.routine_name
2382{where_clause}
2383 "#
2384 );
2385 let mut rewrite = DFParser::parse_sql(&query)?;
2386 assert_eq!(rewrite.len(), 1);
2387 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2389
2390 fn show_create_table_to_plan(
2391 &self,
2392 sql_table_name: ObjectName,
2393 ) -> Result<LogicalPlan> {
2394 if !self.has_table("information_schema", "tables") {
2395 return plan_err!(
2396 "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2397 );
2398 }
2399 let where_clause = object_name_to_qualifier(
2401 &sql_table_name,
2402 self.options.enable_ident_normalization,
2403 )?;
2404
2405 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2407 let _ = self.context_provider.get_table_source(table_ref)?;
2408
2409 let query = format!(
2410 "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2411 );
2412
2413 let mut rewrite = DFParser::parse_sql(&query)?;
2414 assert_eq!(rewrite.len(), 1);
2415 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2417
2418 fn has_table(&self, schema: &str, table: &str) -> bool {
2420 let tables_reference = TableReference::Partial {
2421 schema: schema.into(),
2422 table: table.into(),
2423 };
2424 self.context_provider
2425 .get_table_source(tables_reference)
2426 .is_ok()
2427 }
2428
2429 fn validate_transaction_kind(
2430 &self,
2431 kind: Option<&BeginTransactionKind>,
2432 ) -> Result<()> {
2433 match kind {
2434 None => Ok(()),
2436 Some(BeginTransactionKind::Transaction) => Ok(()),
2438 Some(BeginTransactionKind::Work) => {
2439 not_impl_err!("Transaction kind not supported: {kind:?}")
2440 }
2441 }
2442 }
2443}