datafusion_sql/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24    CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25    LexOrdering, Statement as DFStatement,
26};
27use crate::planner::{
28    object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{DataType, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36    exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
37    unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
38    DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
39    ToDFSchema,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::builder::project;
44use datafusion_expr::logical_plan::DdlStatement;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47    cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
48    CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49    CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50    DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51    EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52    LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable,
53    SortExpr, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
54    TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
55    Volatility, WriteOp,
56};
57use sqlparser::ast::{
58    self, BeginTransactionKind, NullsDistinctOption, ShowStatementIn,
59    ShowStatementOptions, SqliteOnConflict, TableObject, UpdateTableFromKind,
60    ValueWithSpan,
61};
62use sqlparser::ast::{
63    Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
64    CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
65    ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr,
66    ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor,
67    TableWithJoins, TransactionMode, UnaryOperator, Value,
68};
69use sqlparser::parser::ParserError::ParserError;
70
71fn ident_to_string(ident: &Ident) -> String {
72    normalize_ident(ident.to_owned())
73}
74
75fn object_name_to_string(object_name: &ObjectName) -> String {
76    object_name
77        .0
78        .iter()
79        .map(|object_name_part| {
80            object_name_part
81                .as_ident()
82                // TODO: It might be better to return an error
83                // than to silently use a default value.
84                .map_or_else(String::new, ident_to_string)
85        })
86        .collect::<Vec<String>>()
87        .join(".")
88}
89
90fn get_schema_name(schema_name: &SchemaName) -> String {
91    match schema_name {
92        SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
93        SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
94        SchemaName::NamedAuthorization(schema_name, auth) => format!(
95            "{}.{}",
96            object_name_to_string(schema_name),
97            ident_to_string(auth)
98        ),
99    }
100}
101
102/// Construct `TableConstraint`(s) for the given columns by iterating over
103/// `columns` and extracting individual inline constraint definitions.
104fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
105    let mut constraints = vec![];
106    for column in columns {
107        for ast::ColumnOptionDef { name, option } in &column.options {
108            match option {
109                ast::ColumnOption::Unique {
110                    is_primary: false,
111                    characteristics,
112                } => constraints.push(TableConstraint::Unique {
113                    name: name.clone(),
114                    columns: vec![column.name.clone()],
115                    characteristics: *characteristics,
116                    index_name: None,
117                    index_type_display: ast::KeyOrIndexDisplay::None,
118                    index_type: None,
119                    index_options: vec![],
120                    nulls_distinct: NullsDistinctOption::None,
121                }),
122                ast::ColumnOption::Unique {
123                    is_primary: true,
124                    characteristics,
125                } => constraints.push(TableConstraint::PrimaryKey {
126                    name: name.clone(),
127                    columns: vec![column.name.clone()],
128                    characteristics: *characteristics,
129                    index_name: None,
130                    index_type: None,
131                    index_options: vec![],
132                }),
133                ast::ColumnOption::ForeignKey {
134                    foreign_table,
135                    referred_columns,
136                    on_delete,
137                    on_update,
138                    characteristics,
139                } => constraints.push(TableConstraint::ForeignKey {
140                    name: name.clone(),
141                    columns: vec![],
142                    foreign_table: foreign_table.clone(),
143                    referred_columns: referred_columns.to_vec(),
144                    on_delete: *on_delete,
145                    on_update: *on_update,
146                    characteristics: *characteristics,
147                }),
148                ast::ColumnOption::Check(expr) => {
149                    constraints.push(TableConstraint::Check {
150                        name: name.clone(),
151                        expr: Box::new(expr.clone()),
152                    })
153                }
154                // Other options are not constraint related.
155                ast::ColumnOption::Default(_)
156                | ast::ColumnOption::Null
157                | ast::ColumnOption::NotNull
158                | ast::ColumnOption::DialectSpecific(_)
159                | ast::ColumnOption::CharacterSet(_)
160                | ast::ColumnOption::Generated { .. }
161                | ast::ColumnOption::Comment(_)
162                | ast::ColumnOption::Options(_)
163                | ast::ColumnOption::OnUpdate(_)
164                | ast::ColumnOption::Materialized(_)
165                | ast::ColumnOption::Ephemeral(_)
166                | ast::ColumnOption::Identity(_)
167                | ast::ColumnOption::OnConflict(_)
168                | ast::ColumnOption::Policy(_)
169                | ast::ColumnOption::Tags(_)
170                | ast::ColumnOption::Alias(_)
171                | ast::ColumnOption::Collation(_) => {}
172            }
173        }
174    }
175    constraints
176}
177
178impl<S: ContextProvider> SqlToRel<'_, S> {
179    /// Generate a logical plan from an DataFusion SQL statement
180    pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
181        match statement {
182            DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
183            DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
184            DFStatement::CopyTo(s) => self.copy_to_plan(s),
185            DFStatement::Explain(ExplainStatement {
186                verbose,
187                analyze,
188                format,
189                statement,
190            }) => self.explain_to_plan(verbose, analyze, format, *statement),
191        }
192    }
193
194    /// Generate a logical plan from an SQL statement
195    pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
196        self.sql_statement_to_plan_with_context_impl(
197            statement,
198            &mut PlannerContext::new(),
199        )
200    }
201
202    /// Generate a logical plan from an SQL statement
203    pub fn sql_statement_to_plan_with_context(
204        &self,
205        statement: Statement,
206        planner_context: &mut PlannerContext,
207    ) -> Result<LogicalPlan> {
208        self.sql_statement_to_plan_with_context_impl(statement, planner_context)
209    }
210
211    fn sql_statement_to_plan_with_context_impl(
212        &self,
213        statement: Statement,
214        planner_context: &mut PlannerContext,
215    ) -> Result<LogicalPlan> {
216        match statement {
217            Statement::ExplainTable {
218                describe_alias: DescribeAlias::Describe, // only parse 'DESCRIBE table_name' and not 'EXPLAIN table_name'
219                table_name,
220                ..
221            } => self.describe_table_to_plan(table_name),
222            Statement::Explain {
223                verbose,
224                statement,
225                analyze,
226                format,
227                describe_alias: _,
228                ..
229            } => {
230                let format = format.map(|format| format.to_string());
231                let statement = DFStatement::Statement(statement);
232                self.explain_to_plan(verbose, analyze, format, statement)
233            }
234            Statement::Query(query) => self.query_to_plan(*query, planner_context),
235            Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
236            Statement::SetVariable {
237                local,
238                hivevar,
239                variables,
240                value,
241            } => self.set_variable_to_plan(local, hivevar, &variables, value),
242
243            Statement::CreateTable(CreateTable {
244                temporary,
245                external,
246                global,
247                transient,
248                volatile,
249                hive_distribution,
250                hive_formats,
251                file_format,
252                location,
253                query,
254                name,
255                columns,
256                constraints,
257                table_properties,
258                with_options,
259                if_not_exists,
260                or_replace,
261                without_rowid,
262                like,
263                clone,
264                engine,
265                comment,
266                auto_increment_offset,
267                default_charset,
268                collation,
269                on_commit,
270                on_cluster,
271                primary_key,
272                order_by,
273                partition_by,
274                cluster_by,
275                clustered_by,
276                options,
277                strict,
278                copy_grants,
279                enable_schema_evolution,
280                change_tracking,
281                data_retention_time_in_days,
282                max_data_extension_time_in_days,
283                default_ddl_collation,
284                with_aggregation_policy,
285                with_row_access_policy,
286                with_tags,
287                iceberg,
288                external_volume,
289                base_location,
290                catalog,
291                catalog_sync,
292                storage_serialization_policy,
293            }) if table_properties.is_empty() && with_options.is_empty() => {
294                if temporary {
295                    return not_impl_err!("Temporary tables not supported")?;
296                }
297                if external {
298                    return not_impl_err!("External tables not supported")?;
299                }
300                if global.is_some() {
301                    return not_impl_err!("Global tables not supported")?;
302                }
303                if transient {
304                    return not_impl_err!("Transient tables not supported")?;
305                }
306                if volatile {
307                    return not_impl_err!("Volatile tables not supported")?;
308                }
309                if hive_distribution != ast::HiveDistributionStyle::NONE {
310                    return not_impl_err!(
311                        "Hive distribution not supported: {hive_distribution:?}"
312                    )?;
313                }
314                if !matches!(
315                    hive_formats,
316                    Some(ast::HiveFormat {
317                        row_format: None,
318                        serde_properties: None,
319                        storage: None,
320                        location: None,
321                    })
322                ) {
323                    return not_impl_err!(
324                        "Hive formats not supported: {hive_formats:?}"
325                    )?;
326                }
327                if file_format.is_some() {
328                    return not_impl_err!("File format not supported")?;
329                }
330                if location.is_some() {
331                    return not_impl_err!("Location not supported")?;
332                }
333                if without_rowid {
334                    return not_impl_err!("Without rowid not supported")?;
335                }
336                if like.is_some() {
337                    return not_impl_err!("Like not supported")?;
338                }
339                if clone.is_some() {
340                    return not_impl_err!("Clone not supported")?;
341                }
342                if engine.is_some() {
343                    return not_impl_err!("Engine not supported")?;
344                }
345                if comment.is_some() {
346                    return not_impl_err!("Comment not supported")?;
347                }
348                if auto_increment_offset.is_some() {
349                    return not_impl_err!("Auto increment offset not supported")?;
350                }
351                if default_charset.is_some() {
352                    return not_impl_err!("Default charset not supported")?;
353                }
354                if collation.is_some() {
355                    return not_impl_err!("Collation not supported")?;
356                }
357                if on_commit.is_some() {
358                    return not_impl_err!("On commit not supported")?;
359                }
360                if on_cluster.is_some() {
361                    return not_impl_err!("On cluster not supported")?;
362                }
363                if primary_key.is_some() {
364                    return not_impl_err!("Primary key not supported")?;
365                }
366                if order_by.is_some() {
367                    return not_impl_err!("Order by not supported")?;
368                }
369                if partition_by.is_some() {
370                    return not_impl_err!("Partition by not supported")?;
371                }
372                if cluster_by.is_some() {
373                    return not_impl_err!("Cluster by not supported")?;
374                }
375                if clustered_by.is_some() {
376                    return not_impl_err!("Clustered by not supported")?;
377                }
378                if options.is_some() {
379                    return not_impl_err!("Options not supported")?;
380                }
381                if strict {
382                    return not_impl_err!("Strict not supported")?;
383                }
384                if copy_grants {
385                    return not_impl_err!("Copy grants not supported")?;
386                }
387                if enable_schema_evolution.is_some() {
388                    return not_impl_err!("Enable schema evolution not supported")?;
389                }
390                if change_tracking.is_some() {
391                    return not_impl_err!("Change tracking not supported")?;
392                }
393                if data_retention_time_in_days.is_some() {
394                    return not_impl_err!("Data retention time in days not supported")?;
395                }
396                if max_data_extension_time_in_days.is_some() {
397                    return not_impl_err!(
398                        "Max data extension time in days not supported"
399                    )?;
400                }
401                if default_ddl_collation.is_some() {
402                    return not_impl_err!("Default DDL collation not supported")?;
403                }
404                if with_aggregation_policy.is_some() {
405                    return not_impl_err!("With aggregation policy not supported")?;
406                }
407                if with_row_access_policy.is_some() {
408                    return not_impl_err!("With row access policy not supported")?;
409                }
410                if with_tags.is_some() {
411                    return not_impl_err!("With tags not supported")?;
412                }
413                if iceberg {
414                    return not_impl_err!("Iceberg not supported")?;
415                }
416                if external_volume.is_some() {
417                    return not_impl_err!("External volume not supported")?;
418                }
419                if base_location.is_some() {
420                    return not_impl_err!("Base location not supported")?;
421                }
422                if catalog.is_some() {
423                    return not_impl_err!("Catalog not supported")?;
424                }
425                if catalog_sync.is_some() {
426                    return not_impl_err!("Catalog sync not supported")?;
427                }
428                if storage_serialization_policy.is_some() {
429                    return not_impl_err!("Storage serialization policy not supported")?;
430                }
431
432                // Merge inline constraints and existing constraints
433                let mut all_constraints = constraints;
434                let inline_constraints = calc_inline_constraints_from_columns(&columns);
435                all_constraints.extend(inline_constraints);
436                // Build column default values
437                let column_defaults =
438                    self.build_column_defaults(&columns, planner_context)?;
439
440                let has_columns = !columns.is_empty();
441                let schema = self.build_schema(columns)?.to_dfschema_ref()?;
442                if has_columns {
443                    planner_context.set_table_schema(Some(Arc::clone(&schema)));
444                }
445
446                match query {
447                    Some(query) => {
448                        let plan = self.query_to_plan(*query, planner_context)?;
449                        let input_schema = plan.schema();
450
451                        let plan = if has_columns {
452                            if schema.fields().len() != input_schema.fields().len() {
453                                return plan_err!(
454                            "Mismatch: {} columns specified, but result has {} columns",
455                            schema.fields().len(),
456                            input_schema.fields().len()
457                        );
458                            }
459                            let input_fields = input_schema.fields();
460                            let project_exprs = schema
461                                .fields()
462                                .iter()
463                                .zip(input_fields)
464                                .map(|(field, input_field)| {
465                                    cast(
466                                        col(input_field.name()),
467                                        field.data_type().clone(),
468                                    )
469                                    .alias(field.name())
470                                })
471                                .collect::<Vec<_>>();
472
473                            LogicalPlanBuilder::from(plan.clone())
474                                .project(project_exprs)?
475                                .build()?
476                        } else {
477                            plan
478                        };
479
480                        let constraints = self.new_constraint_from_table_constraints(
481                            &all_constraints,
482                            plan.schema(),
483                        )?;
484
485                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
486                            CreateMemoryTable {
487                                name: self.object_name_to_table_reference(name)?,
488                                constraints,
489                                input: Arc::new(plan),
490                                if_not_exists,
491                                or_replace,
492                                column_defaults,
493                                temporary,
494                            },
495                        )))
496                    }
497
498                    None => {
499                        let plan = EmptyRelation {
500                            produce_one_row: false,
501                            schema,
502                        };
503                        let plan = LogicalPlan::EmptyRelation(plan);
504                        let constraints = self.new_constraint_from_table_constraints(
505                            &all_constraints,
506                            plan.schema(),
507                        )?;
508                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
509                            CreateMemoryTable {
510                                name: self.object_name_to_table_reference(name)?,
511                                constraints,
512                                input: Arc::new(plan),
513                                if_not_exists,
514                                or_replace,
515                                column_defaults,
516                                temporary,
517                            },
518                        )))
519                    }
520                }
521            }
522
523            Statement::CreateView {
524                or_replace,
525                materialized,
526                name,
527                columns,
528                query,
529                options: CreateTableOptions::None,
530                cluster_by,
531                comment,
532                with_no_schema_binding,
533                if_not_exists,
534                temporary,
535                to,
536                params,
537            } => {
538                if materialized {
539                    return not_impl_err!("Materialized views not supported")?;
540                }
541                if !cluster_by.is_empty() {
542                    return not_impl_err!("Cluster by not supported")?;
543                }
544                if comment.is_some() {
545                    return not_impl_err!("Comment not supported")?;
546                }
547                if with_no_schema_binding {
548                    return not_impl_err!("With no schema binding not supported")?;
549                }
550                if if_not_exists {
551                    return not_impl_err!("If not exists not supported")?;
552                }
553                if to.is_some() {
554                    return not_impl_err!("To not supported")?;
555                }
556
557                // put the statement back together temporarily to get the SQL
558                // string representation
559                let stmt = Statement::CreateView {
560                    or_replace,
561                    materialized,
562                    name,
563                    columns,
564                    query,
565                    options: CreateTableOptions::None,
566                    cluster_by,
567                    comment,
568                    with_no_schema_binding,
569                    if_not_exists,
570                    temporary,
571                    to,
572                    params,
573                };
574                let sql = stmt.to_string();
575                let Statement::CreateView {
576                    name,
577                    columns,
578                    query,
579                    or_replace,
580                    temporary,
581                    ..
582                } = stmt
583                else {
584                    return internal_err!("Unreachable code in create view");
585                };
586
587                let columns = columns
588                    .into_iter()
589                    .map(|view_column_def| {
590                        if let Some(options) = view_column_def.options {
591                            plan_err!(
592                                "Options not supported for view columns: {options:?}"
593                            )
594                        } else {
595                            Ok(view_column_def.name)
596                        }
597                    })
598                    .collect::<Result<Vec<_>>>()?;
599
600                let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
601                plan = self.apply_expr_alias(plan, columns)?;
602
603                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
604                    name: self.object_name_to_table_reference(name)?,
605                    input: Arc::new(plan),
606                    or_replace,
607                    definition: Some(sql),
608                    temporary,
609                })))
610            }
611            Statement::ShowCreate { obj_type, obj_name } => match obj_type {
612                ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
613                _ => {
614                    not_impl_err!("Only `SHOW CREATE TABLE  ...` statement is supported")
615                }
616            },
617            Statement::CreateSchema {
618                schema_name,
619                if_not_exists,
620            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
621                CreateCatalogSchema {
622                    schema_name: get_schema_name(&schema_name),
623                    if_not_exists,
624                    schema: Arc::new(DFSchema::empty()),
625                },
626            ))),
627            Statement::CreateDatabase {
628                db_name,
629                if_not_exists,
630                ..
631            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
632                CreateCatalog {
633                    catalog_name: object_name_to_string(&db_name),
634                    if_not_exists,
635                    schema: Arc::new(DFSchema::empty()),
636                },
637            ))),
638            Statement::Drop {
639                object_type,
640                if_exists,
641                mut names,
642                cascade,
643                restrict: _,
644                purge: _,
645                temporary: _,
646            } => {
647                // We don't support cascade and purge for now.
648                // nor do we support multiple object names
649                let name = match names.len() {
650                    0 => Err(ParserError("Missing table name.".to_string()).into()),
651                    1 => self.object_name_to_table_reference(names.pop().unwrap()),
652                    _ => {
653                        Err(ParserError("Multiple objects not supported".to_string())
654                            .into())
655                    }
656                }?;
657
658                match object_type {
659                    ObjectType::Table => {
660                        Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
661                            name,
662                            if_exists,
663                            schema: DFSchemaRef::new(DFSchema::empty()),
664                        })))
665                    }
666                    ObjectType::View => {
667                        Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
668                            name,
669                            if_exists,
670                            schema: DFSchemaRef::new(DFSchema::empty()),
671                        })))
672                    }
673                    ObjectType::Schema => {
674                        let name = match name {
675                            TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
676                            TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
677                            TableReference::Full { catalog: _, schema: _, table: _ } => {
678                                Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
679                            }
680                        }?;
681                        Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
682                            name,
683                            if_exists,
684                            cascade,
685                            schema: DFSchemaRef::new(DFSchema::empty()),
686                        })))
687                    }
688                    _ => not_impl_err!(
689                        "Only `DROP TABLE/VIEW/SCHEMA  ...` statement is supported currently"
690                    ),
691                }
692            }
693            Statement::Prepare {
694                name,
695                data_types,
696                statement,
697            } => {
698                // Convert parser data types to DataFusion data types
699                let data_types: Vec<DataType> = data_types
700                    .into_iter()
701                    .map(|t| self.convert_data_type(&t))
702                    .collect::<Result<_>>()?;
703
704                // Create planner context with parameters
705                let mut planner_context = PlannerContext::new()
706                    .with_prepare_param_data_types(data_types.clone());
707
708                // Build logical plan for inner statement of the prepare statement
709                let plan = self.sql_statement_to_plan_with_context_impl(
710                    *statement,
711                    &mut planner_context,
712                )?;
713                Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
714                    name: ident_to_string(&name),
715                    data_types,
716                    input: Arc::new(plan),
717                })))
718            }
719            Statement::Execute {
720                name,
721                parameters,
722                using,
723                // has_parentheses specifies the syntax, but the plan is the
724                // same no matter the syntax used, so ignore it
725                has_parentheses: _,
726                immediate,
727                into,
728            } => {
729                // `USING` is a MySQL-specific syntax and currently not supported.
730                if !using.is_empty() {
731                    return not_impl_err!(
732                        "Execute statement with USING is not supported"
733                    );
734                }
735                if immediate {
736                    return not_impl_err!(
737                        "Execute statement with IMMEDIATE is not supported"
738                    );
739                }
740                if !into.is_empty() {
741                    return not_impl_err!("Execute statement with INTO is not supported");
742                }
743                let empty_schema = DFSchema::empty();
744                let parameters = parameters
745                    .into_iter()
746                    .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
747                    .collect::<Result<Vec<Expr>>>()?;
748
749                Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
750                    name: object_name_to_string(&name.unwrap()),
751                    parameters,
752                })))
753            }
754            Statement::Deallocate {
755                name,
756                // Similar to PostgreSQL, the PREPARE keyword is ignored
757                prepare: _,
758            } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
759                Deallocate {
760                    name: ident_to_string(&name),
761                },
762            ))),
763
764            Statement::ShowTables {
765                extended,
766                full,
767                terse,
768                history,
769                external,
770                show_options,
771            } => {
772                // We only support the basic "SHOW TABLES"
773                // https://github.com/apache/datafusion/issues/3188
774                if extended {
775                    return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
776                }
777                if full {
778                    return not_impl_err!("SHOW FULL TABLES not supported")?;
779                }
780                if terse {
781                    return not_impl_err!("SHOW TERSE TABLES not supported")?;
782                }
783                if history {
784                    return not_impl_err!("SHOW TABLES HISTORY not supported")?;
785                }
786                if external {
787                    return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
788                }
789                let ShowStatementOptions {
790                    show_in,
791                    starts_with,
792                    limit,
793                    limit_from,
794                    filter_position,
795                } = show_options;
796                if show_in.is_some() {
797                    return not_impl_err!("SHOW TABLES IN not supported")?;
798                }
799                if starts_with.is_some() {
800                    return not_impl_err!("SHOW TABLES LIKE not supported")?;
801                }
802                if limit.is_some() {
803                    return not_impl_err!("SHOW TABLES LIMIT not supported")?;
804                }
805                if limit_from.is_some() {
806                    return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
807                }
808                if filter_position.is_some() {
809                    return not_impl_err!("SHOW TABLES FILTER not supported")?;
810                }
811                self.show_tables_to_plan()
812            }
813
814            Statement::ShowColumns {
815                extended,
816                full,
817                show_options,
818            } => {
819                let ShowStatementOptions {
820                    show_in,
821                    starts_with,
822                    limit,
823                    limit_from,
824                    filter_position,
825                } = show_options;
826                if starts_with.is_some() {
827                    return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
828                }
829                if limit.is_some() {
830                    return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
831                }
832                if limit_from.is_some() {
833                    return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
834                }
835                if filter_position.is_some() {
836                    return not_impl_err!(
837                        "SHOW COLUMNS with WHERE or LIKE is not supported"
838                    )?;
839                }
840                let Some(ShowStatementIn {
841                    // specifies if the syntax was `SHOW COLUMNS IN` or `SHOW
842                    // COLUMNS FROM` which is not different in DataFusion
843                    clause: _,
844                    parent_type,
845                    parent_name,
846                }) = show_in
847                else {
848                    return plan_err!("SHOW COLUMNS requires a table name");
849                };
850
851                if let Some(parent_type) = parent_type {
852                    return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
853                }
854                let Some(table_name) = parent_name else {
855                    return plan_err!("SHOW COLUMNS requires a table name");
856                };
857
858                self.show_columns_to_plan(extended, full, table_name)
859            }
860
861            Statement::ShowFunctions { filter, .. } => {
862                self.show_functions_to_plan(filter)
863            }
864
865            Statement::Insert(Insert {
866                or,
867                into,
868                columns,
869                overwrite,
870                source,
871                partitioned,
872                after_columns,
873                table,
874                on,
875                returning,
876                ignore,
877                table_alias,
878                mut replace_into,
879                priority,
880                insert_alias,
881                assignments,
882                has_table_keyword,
883                settings,
884                format_clause,
885            }) => {
886                let table_name = match table {
887                    TableObject::TableName(table_name) => table_name,
888                    TableObject::TableFunction(_) => {
889                        return not_impl_err!("INSERT INTO Table functions not supported")
890                    }
891                };
892                if let Some(or) = or {
893                    match or {
894                        SqliteOnConflict::Replace => replace_into = true,
895                        _ => plan_err!("Inserts with {or} clause is not supported")?,
896                    }
897                }
898                if partitioned.is_some() {
899                    plan_err!("Partitioned inserts not yet supported")?;
900                }
901                if !after_columns.is_empty() {
902                    plan_err!("After-columns clause not supported")?;
903                }
904                if on.is_some() {
905                    plan_err!("Insert-on clause not supported")?;
906                }
907                if returning.is_some() {
908                    plan_err!("Insert-returning clause not supported")?;
909                }
910                if ignore {
911                    plan_err!("Insert-ignore clause not supported")?;
912                }
913                let Some(source) = source else {
914                    plan_err!("Inserts without a source not supported")?
915                };
916                if let Some(table_alias) = table_alias {
917                    plan_err!(
918                        "Inserts with a table alias not supported: {table_alias:?}"
919                    )?
920                };
921                if let Some(priority) = priority {
922                    plan_err!(
923                        "Inserts with a `PRIORITY` clause not supported: {priority:?}"
924                    )?
925                };
926                if insert_alias.is_some() {
927                    plan_err!("Inserts with an alias not supported")?;
928                }
929                if !assignments.is_empty() {
930                    plan_err!("Inserts with assignments not supported")?;
931                }
932                if settings.is_some() {
933                    plan_err!("Inserts with settings not supported")?;
934                }
935                if format_clause.is_some() {
936                    plan_err!("Inserts with format clause not supported")?;
937                }
938                // optional keywords don't change behavior
939                let _ = into;
940                let _ = has_table_keyword;
941                self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
942            }
943            Statement::Update {
944                table,
945                assignments,
946                from,
947                selection,
948                returning,
949                or,
950            } => {
951                let froms =
952                    from.map(|update_table_from_kind| match update_table_from_kind {
953                        UpdateTableFromKind::BeforeSet(froms) => froms,
954                        UpdateTableFromKind::AfterSet(froms) => froms,
955                    });
956                // TODO: support multiple tables in UPDATE SET FROM
957                if froms.as_ref().is_some_and(|f| f.len() > 1) {
958                    plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
959                }
960                let update_from = froms.and_then(|mut f| f.pop());
961                if returning.is_some() {
962                    plan_err!("Update-returning clause not yet supported")?;
963                }
964                if or.is_some() {
965                    plan_err!("ON conflict not supported")?;
966                }
967                self.update_to_plan(table, assignments, update_from, selection)
968            }
969
970            Statement::Delete(Delete {
971                tables,
972                using,
973                selection,
974                returning,
975                from,
976                order_by,
977                limit,
978            }) => {
979                if !tables.is_empty() {
980                    plan_err!("DELETE <TABLE> not supported")?;
981                }
982
983                if using.is_some() {
984                    plan_err!("Using clause not supported")?;
985                }
986
987                if returning.is_some() {
988                    plan_err!("Delete-returning clause not yet supported")?;
989                }
990
991                if !order_by.is_empty() {
992                    plan_err!("Delete-order-by clause not yet supported")?;
993                }
994
995                if limit.is_some() {
996                    plan_err!("Delete-limit clause not yet supported")?;
997                }
998
999                let table_name = self.get_delete_target(from)?;
1000                self.delete_to_plan(table_name, selection)
1001            }
1002
1003            Statement::StartTransaction {
1004                modes,
1005                begin: false,
1006                modifier,
1007                transaction,
1008                statements,
1009                exception_statements,
1010                has_end_keyword,
1011            } => {
1012                if let Some(modifier) = modifier {
1013                    return not_impl_err!(
1014                        "Transaction modifier not supported: {modifier}"
1015                    );
1016                }
1017                if !statements.is_empty() {
1018                    return not_impl_err!(
1019                        "Transaction with multiple statements not supported"
1020                    );
1021                }
1022                if exception_statements.is_some() {
1023                    return not_impl_err!(
1024                        "Transaction with exception statements not supported"
1025                    );
1026                }
1027                if has_end_keyword {
1028                    return not_impl_err!("Transaction with END keyword not supported");
1029                }
1030                self.validate_transaction_kind(transaction)?;
1031                let isolation_level: ast::TransactionIsolationLevel = modes
1032                    .iter()
1033                    .filter_map(|m: &TransactionMode| match m {
1034                        TransactionMode::AccessMode(_) => None,
1035                        TransactionMode::IsolationLevel(level) => Some(level),
1036                    })
1037                    .next_back()
1038                    .copied()
1039                    .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1040                let access_mode: ast::TransactionAccessMode = modes
1041                    .iter()
1042                    .filter_map(|m: &TransactionMode| match m {
1043                        TransactionMode::AccessMode(mode) => Some(mode),
1044                        TransactionMode::IsolationLevel(_) => None,
1045                    })
1046                    .next_back()
1047                    .copied()
1048                    .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1049                let isolation_level = match isolation_level {
1050                    ast::TransactionIsolationLevel::ReadUncommitted => {
1051                        TransactionIsolationLevel::ReadUncommitted
1052                    }
1053                    ast::TransactionIsolationLevel::ReadCommitted => {
1054                        TransactionIsolationLevel::ReadCommitted
1055                    }
1056                    ast::TransactionIsolationLevel::RepeatableRead => {
1057                        TransactionIsolationLevel::RepeatableRead
1058                    }
1059                    ast::TransactionIsolationLevel::Serializable => {
1060                        TransactionIsolationLevel::Serializable
1061                    }
1062                    ast::TransactionIsolationLevel::Snapshot => {
1063                        TransactionIsolationLevel::Snapshot
1064                    }
1065                };
1066                let access_mode = match access_mode {
1067                    ast::TransactionAccessMode::ReadOnly => {
1068                        TransactionAccessMode::ReadOnly
1069                    }
1070                    ast::TransactionAccessMode::ReadWrite => {
1071                        TransactionAccessMode::ReadWrite
1072                    }
1073                };
1074                let statement = PlanStatement::TransactionStart(TransactionStart {
1075                    access_mode,
1076                    isolation_level,
1077                });
1078                Ok(LogicalPlan::Statement(statement))
1079            }
1080            Statement::Commit {
1081                chain,
1082                end,
1083                modifier,
1084            } => {
1085                if end {
1086                    return not_impl_err!("COMMIT AND END not supported");
1087                };
1088                if let Some(modifier) = modifier {
1089                    return not_impl_err!("COMMIT {modifier} not supported");
1090                };
1091                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1092                    conclusion: TransactionConclusion::Commit,
1093                    chain,
1094                });
1095                Ok(LogicalPlan::Statement(statement))
1096            }
1097            Statement::Rollback { chain, savepoint } => {
1098                if savepoint.is_some() {
1099                    plan_err!("Savepoints not supported")?;
1100                }
1101                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1102                    conclusion: TransactionConclusion::Rollback,
1103                    chain,
1104                });
1105                Ok(LogicalPlan::Statement(statement))
1106            }
1107            Statement::CreateFunction(ast::CreateFunction {
1108                or_replace,
1109                temporary,
1110                name,
1111                args,
1112                return_type,
1113                function_body,
1114                behavior,
1115                language,
1116                ..
1117            }) => {
1118                let return_type = match return_type {
1119                    Some(t) => Some(self.convert_data_type(&t)?),
1120                    None => None,
1121                };
1122                let mut planner_context = PlannerContext::new();
1123                let empty_schema = &DFSchema::empty();
1124
1125                let args = match args {
1126                    Some(function_args) => {
1127                        let function_args = function_args
1128                            .into_iter()
1129                            .map(|arg| {
1130                                let data_type = self.convert_data_type(&arg.data_type)?;
1131
1132                                let default_expr = match arg.default_expr {
1133                                    Some(expr) => Some(self.sql_to_expr(
1134                                        expr,
1135                                        empty_schema,
1136                                        &mut planner_context,
1137                                    )?),
1138                                    None => None,
1139                                };
1140                                Ok(OperateFunctionArg {
1141                                    name: arg.name,
1142                                    default_expr,
1143                                    data_type,
1144                                })
1145                            })
1146                            .collect::<Result<Vec<OperateFunctionArg>>>();
1147                        Some(function_args?)
1148                    }
1149                    None => None,
1150                };
1151                // At the moment functions can't be qualified `schema.name`
1152                let name = match &name.0[..] {
1153                    [] => exec_err!("Function should have name")?,
1154                    [n] => n.as_ident().unwrap().value.clone(),
1155                    [..] => not_impl_err!("Qualified functions are not supported")?,
1156                };
1157                //
1158                // Convert resulting expression to data fusion expression
1159                //
1160                let arg_types = args.as_ref().map(|arg| {
1161                    arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
1162                });
1163                let mut planner_context = PlannerContext::new()
1164                    .with_prepare_param_data_types(arg_types.unwrap_or_default());
1165
1166                let function_body = match function_body {
1167                    Some(r) => Some(self.sql_to_expr(
1168                        match r {
1169                            ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1170                            ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1171                            ast::CreateFunctionBody::Return(expr) => expr,
1172                        },
1173                        &DFSchema::empty(),
1174                        &mut planner_context,
1175                    )?),
1176                    None => None,
1177                };
1178
1179                let params = CreateFunctionBody {
1180                    language,
1181                    behavior: behavior.map(|b| match b {
1182                        ast::FunctionBehavior::Immutable => Volatility::Immutable,
1183                        ast::FunctionBehavior::Stable => Volatility::Stable,
1184                        ast::FunctionBehavior::Volatile => Volatility::Volatile,
1185                    }),
1186                    function_body,
1187                };
1188
1189                let statement = DdlStatement::CreateFunction(CreateFunction {
1190                    or_replace,
1191                    temporary,
1192                    name,
1193                    return_type,
1194                    args,
1195                    params,
1196                    schema: DFSchemaRef::new(DFSchema::empty()),
1197                });
1198
1199                Ok(LogicalPlan::Ddl(statement))
1200            }
1201            Statement::DropFunction {
1202                if_exists,
1203                func_desc,
1204                ..
1205            } => {
1206                // According to postgresql documentation it can be only one function
1207                // specified in drop statement
1208                if let Some(desc) = func_desc.first() {
1209                    // At the moment functions can't be qualified `schema.name`
1210                    let name = match &desc.name.0[..] {
1211                        [] => exec_err!("Function should have name")?,
1212                        [n] => n.as_ident().unwrap().value.clone(),
1213                        [..] => not_impl_err!("Qualified functions are not supported")?,
1214                    };
1215                    let statement = DdlStatement::DropFunction(DropFunction {
1216                        if_exists,
1217                        name,
1218                        schema: DFSchemaRef::new(DFSchema::empty()),
1219                    });
1220                    Ok(LogicalPlan::Ddl(statement))
1221                } else {
1222                    exec_err!("Function name not provided")
1223                }
1224            }
1225            Statement::CreateIndex(CreateIndex {
1226                name,
1227                table_name,
1228                using,
1229                columns,
1230                unique,
1231                if_not_exists,
1232                ..
1233            }) => {
1234                let name: Option<String> = name.as_ref().map(object_name_to_string);
1235                let table = self.object_name_to_table_reference(table_name)?;
1236                let table_schema = self
1237                    .context_provider
1238                    .get_table_source(table.clone())?
1239                    .schema()
1240                    .to_dfschema_ref()?;
1241                let using: Option<String> = using.as_ref().map(ident_to_string);
1242                let columns = self.order_by_to_sort_expr(
1243                    columns,
1244                    &table_schema,
1245                    planner_context,
1246                    false,
1247                    None,
1248                )?;
1249                Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1250                    PlanCreateIndex {
1251                        name,
1252                        table,
1253                        using,
1254                        columns,
1255                        unique,
1256                        if_not_exists,
1257                        schema: DFSchemaRef::new(DFSchema::empty()),
1258                    },
1259                )))
1260            }
1261            stmt => {
1262                not_impl_err!("Unsupported SQL statement: {stmt}")
1263            }
1264        }
1265    }
1266
1267    fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1268        let mut from = match from {
1269            FromTable::WithFromKeyword(v) => v,
1270            FromTable::WithoutKeyword(v) => v,
1271        };
1272
1273        if from.len() != 1 {
1274            return not_impl_err!(
1275                "DELETE FROM only supports single table, got {}: {from:?}",
1276                from.len()
1277            );
1278        }
1279        let table_factor = from.pop().unwrap();
1280        if !table_factor.joins.is_empty() {
1281            return not_impl_err!("DELETE FROM only supports single table, got: joins");
1282        }
1283        let TableFactor::Table { name, .. } = table_factor.relation else {
1284            return not_impl_err!(
1285                "DELETE FROM only supports single table, got: {table_factor:?}"
1286            );
1287        };
1288
1289        Ok(name)
1290    }
1291
1292    /// Generate a logical plan from a "SHOW TABLES" query
1293    fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1294        if self.has_table("information_schema", "tables") {
1295            let query = "SELECT * FROM information_schema.tables;";
1296            let mut rewrite = DFParser::parse_sql(query)?;
1297            assert_eq!(rewrite.len(), 1);
1298            self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
1299        } else {
1300            plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1301        }
1302    }
1303
1304    fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1305        let table_ref = self.object_name_to_table_reference(table_name)?;
1306
1307        let table_source = self.context_provider.get_table_source(table_ref)?;
1308
1309        let schema = table_source.schema();
1310
1311        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1312
1313        Ok(LogicalPlan::DescribeTable(DescribeTable {
1314            schema,
1315            output_schema: Arc::new(output_schema),
1316        }))
1317    }
1318
1319    fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1320        // Determine if source is table or query and handle accordingly
1321        let copy_source = statement.source;
1322        let (input, input_schema, table_ref) = match copy_source {
1323            CopyToSource::Relation(object_name) => {
1324                let table_name = object_name_to_string(&object_name);
1325                let table_ref = self.object_name_to_table_reference(object_name)?;
1326                let table_source =
1327                    self.context_provider.get_table_source(table_ref.clone())?;
1328                let plan =
1329                    LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1330                let input_schema = Arc::clone(plan.schema());
1331                (plan, input_schema, Some(table_ref))
1332            }
1333            CopyToSource::Query(query) => {
1334                let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1335                let input_schema = Arc::clone(plan.schema());
1336                (plan, input_schema, None)
1337            }
1338        };
1339
1340        let options_map = self.parse_options_map(statement.options, true)?;
1341
1342        let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1343            self.context_provider.get_file_type(stored_as).ok()
1344        } else {
1345            None
1346        };
1347
1348        let file_type = match maybe_file_type {
1349            Some(ft) => ft,
1350            None => {
1351                let e = || {
1352                    DataFusionError::Configuration(
1353                        "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1354                            .to_string(),
1355                    )
1356                };
1357                // Try to infer file format from file extension
1358                let extension: &str = &Path::new(&statement.target)
1359                    .extension()
1360                    .ok_or_else(e)?
1361                    .to_str()
1362                    .ok_or_else(e)?
1363                    .to_lowercase();
1364
1365                self.context_provider.get_file_type(extension)?
1366            }
1367        };
1368
1369        let partition_by = statement
1370            .partitioned_by
1371            .iter()
1372            .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1373            .collect::<Result<Vec<_>>>()?
1374            .into_iter()
1375            .map(|f| f.name().to_owned())
1376            .collect();
1377
1378        Ok(LogicalPlan::Copy(CopyTo {
1379            input: Arc::new(input),
1380            output_url: statement.target,
1381            file_type,
1382            partition_by,
1383            options: options_map,
1384        }))
1385    }
1386
1387    fn build_order_by(
1388        &self,
1389        order_exprs: Vec<LexOrdering>,
1390        schema: &DFSchemaRef,
1391        planner_context: &mut PlannerContext,
1392    ) -> Result<Vec<Vec<SortExpr>>> {
1393        if !order_exprs.is_empty() && schema.fields().is_empty() {
1394            let results = order_exprs
1395                .iter()
1396                .map(|lex_order| {
1397                    let result = lex_order
1398                        .iter()
1399                        .map(|order_by_expr| {
1400                            let ordered_expr = &order_by_expr.expr;
1401                            let ordered_expr = ordered_expr.to_owned();
1402                            let ordered_expr = self
1403                                .sql_expr_to_logical_expr(
1404                                    ordered_expr,
1405                                    schema,
1406                                    planner_context,
1407                                )
1408                                .unwrap();
1409                            let asc = order_by_expr.options.asc.unwrap_or(true);
1410                            let nulls_first =
1411                                order_by_expr.options.nulls_first.unwrap_or(!asc);
1412
1413                            SortExpr::new(ordered_expr, asc, nulls_first)
1414                        })
1415                        .collect::<Vec<SortExpr>>();
1416                    result
1417                })
1418                .collect::<Vec<Vec<SortExpr>>>();
1419
1420            return Ok(results);
1421        }
1422
1423        let mut all_results = vec![];
1424        for expr in order_exprs {
1425            // Convert each OrderByExpr to a SortExpr:
1426            let expr_vec =
1427                self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1428            // Verify that columns of all SortExprs exist in the schema:
1429            for sort in expr_vec.iter() {
1430                for column in sort.expr.column_refs().iter() {
1431                    if !schema.has_column(column) {
1432                        // Return an error if any column is not in the schema:
1433                        return plan_err!("Column {column} is not in schema");
1434                    }
1435                }
1436            }
1437            // If all SortExprs are valid, return them as an expression vector
1438            all_results.push(expr_vec)
1439        }
1440        Ok(all_results)
1441    }
1442
1443    /// Generate a logical plan from a CREATE EXTERNAL TABLE statement
1444    fn external_table_to_plan(
1445        &self,
1446        statement: CreateExternalTable,
1447    ) -> Result<LogicalPlan> {
1448        let definition = Some(statement.to_string());
1449        let CreateExternalTable {
1450            name,
1451            columns,
1452            file_type,
1453            location,
1454            table_partition_cols,
1455            if_not_exists,
1456            temporary,
1457            order_exprs,
1458            unbounded,
1459            options,
1460            constraints,
1461        } = statement;
1462
1463        // Merge inline constraints and existing constraints
1464        let mut all_constraints = constraints;
1465        let inline_constraints = calc_inline_constraints_from_columns(&columns);
1466        all_constraints.extend(inline_constraints);
1467
1468        let options_map = self.parse_options_map(options, false)?;
1469
1470        let compression = options_map
1471            .get("format.compression")
1472            .map(|c| CompressionTypeVariant::from_str(c))
1473            .transpose()?;
1474        if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1475            && compression
1476                .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1477                .unwrap_or(false)
1478        {
1479            plan_err!(
1480                "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1481            )?;
1482        }
1483
1484        let mut planner_context = PlannerContext::new();
1485
1486        let column_defaults = self
1487            .build_column_defaults(&columns, &mut planner_context)?
1488            .into_iter()
1489            .collect();
1490
1491        let schema = self.build_schema(columns)?;
1492        let df_schema = schema.to_dfschema_ref()?;
1493        df_schema.check_names()?;
1494
1495        let ordered_exprs =
1496            self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1497
1498        let name = self.object_name_to_table_reference(name)?;
1499        let constraints =
1500            self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1501        Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1502            PlanCreateExternalTable {
1503                schema: df_schema,
1504                name,
1505                location,
1506                file_type,
1507                table_partition_cols,
1508                if_not_exists,
1509                temporary,
1510                definition,
1511                order_exprs: ordered_exprs,
1512                unbounded,
1513                options: options_map,
1514                constraints,
1515                column_defaults,
1516            },
1517        )))
1518    }
1519
1520    /// Get the indices of the constraint columns in the schema.
1521    /// If any column is not found, return an error.
1522    fn get_constraint_column_indices(
1523        &self,
1524        df_schema: &DFSchemaRef,
1525        columns: &[Ident],
1526        constraint_name: &str,
1527    ) -> Result<Vec<usize>> {
1528        let field_names = df_schema.field_names();
1529        columns
1530            .iter()
1531            .map(|ident| {
1532                let column = self.ident_normalizer.normalize(ident.clone());
1533                field_names
1534                    .iter()
1535                    .position(|item| *item == column)
1536                    .ok_or_else(|| {
1537                        plan_datafusion_err!(
1538                            "Column for {constraint_name} not found in schema: {column}"
1539                        )
1540                    })
1541            })
1542            .collect::<Result<Vec<_>>>()
1543    }
1544
1545    /// Convert each [TableConstraint] to corresponding [Constraint]
1546    pub fn new_constraint_from_table_constraints(
1547        &self,
1548        constraints: &[TableConstraint],
1549        df_schema: &DFSchemaRef,
1550    ) -> Result<Constraints> {
1551        let constraints = constraints
1552            .iter()
1553            .map(|c: &TableConstraint| match c {
1554                TableConstraint::Unique { name, columns, .. } => {
1555                    let constraint_name = match name {
1556                        Some(name) => &format!("unique constraint with name '{name}'"),
1557                        None => "unique constraint",
1558                    };
1559                    // Get unique constraint indices in the schema
1560                    let indices = self.get_constraint_column_indices(
1561                        df_schema,
1562                        columns,
1563                        constraint_name,
1564                    )?;
1565                    Ok(Constraint::Unique(indices))
1566                }
1567                TableConstraint::PrimaryKey { columns, .. } => {
1568                    // Get primary key indices in the schema
1569                    let indices = self.get_constraint_column_indices(
1570                        df_schema,
1571                        columns,
1572                        "primary key",
1573                    )?;
1574                    Ok(Constraint::PrimaryKey(indices))
1575                }
1576                TableConstraint::ForeignKey { .. } => {
1577                    _plan_err!("Foreign key constraints are not currently supported")
1578                }
1579                TableConstraint::Check { .. } => {
1580                    _plan_err!("Check constraints are not currently supported")
1581                }
1582                TableConstraint::Index { .. } => {
1583                    _plan_err!("Indexes are not currently supported")
1584                }
1585                TableConstraint::FulltextOrSpatial { .. } => {
1586                    _plan_err!("Indexes are not currently supported")
1587                }
1588            })
1589            .collect::<Result<Vec<_>>>()?;
1590        Ok(Constraints::new_unverified(constraints))
1591    }
1592
1593    fn parse_options_map(
1594        &self,
1595        options: Vec<(String, Value)>,
1596        allow_duplicates: bool,
1597    ) -> Result<HashMap<String, String>> {
1598        let mut options_map = HashMap::new();
1599        for (key, value) in options {
1600            if !allow_duplicates && options_map.contains_key(&key) {
1601                return plan_err!("Option {key} is specified multiple times");
1602            }
1603
1604            let Some(value_string) = crate::utils::value_to_string(&value) else {
1605                return plan_err!("Unsupported Value {}", value);
1606            };
1607
1608            if !(&key.contains('.')) {
1609                // If config does not belong to any namespace, assume it is
1610                // a format option and apply the format prefix for backwards
1611                // compatibility.
1612                let renamed_key = format!("format.{}", key);
1613                options_map.insert(renamed_key.to_lowercase(), value_string);
1614            } else {
1615                options_map.insert(key.to_lowercase(), value_string);
1616            }
1617        }
1618
1619        Ok(options_map)
1620    }
1621
1622    /// Generate a plan for EXPLAIN ... that will print out a plan
1623    ///
1624    /// Note this is the sqlparser explain statement, not the
1625    /// datafusion `EXPLAIN` statement.
1626    fn explain_to_plan(
1627        &self,
1628        verbose: bool,
1629        analyze: bool,
1630        format: Option<String>,
1631        statement: DFStatement,
1632    ) -> Result<LogicalPlan> {
1633        let plan = self.statement_to_plan(statement)?;
1634        if matches!(plan, LogicalPlan::Explain(_)) {
1635            return plan_err!("Nested EXPLAINs are not supported");
1636        }
1637
1638        let plan = Arc::new(plan);
1639        let schema = LogicalPlan::explain_schema();
1640        let schema = schema.to_dfschema_ref()?;
1641
1642        if verbose && format.is_some() {
1643            return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1644        }
1645
1646        if analyze {
1647            if format.is_some() {
1648                return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1649            }
1650            Ok(LogicalPlan::Analyze(Analyze {
1651                verbose,
1652                input: plan,
1653                schema,
1654            }))
1655        } else {
1656            let stringified_plans =
1657                vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1658
1659            // default to configuration value
1660            let options = self.context_provider.options();
1661            let format = format.as_ref().unwrap_or(&options.explain.format);
1662
1663            let format: ExplainFormat = format.parse()?;
1664
1665            Ok(LogicalPlan::Explain(Explain {
1666                verbose,
1667                explain_format: format,
1668                plan,
1669                stringified_plans,
1670                schema,
1671                logical_optimization_succeeded: false,
1672            }))
1673        }
1674    }
1675
1676    fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1677        if !self.has_table("information_schema", "df_settings") {
1678            return plan_err!(
1679                "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1680            );
1681        }
1682
1683        let verbose = variable
1684            .last()
1685            .map(|s| ident_to_string(s) == "verbose")
1686            .unwrap_or(false);
1687        let mut variable_vec = variable.to_vec();
1688        let mut columns: String = "name, value".to_owned();
1689
1690        if verbose {
1691            columns = format!("{columns}, description");
1692            variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1693        }
1694
1695        let variable = object_name_to_string(&ObjectName::from(variable_vec));
1696        let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1697        let query = if variable == "all" {
1698            // Add an ORDER BY so the output comes out in a consistent order
1699            format!("{base_query} ORDER BY name")
1700        } else if variable == "timezone" || variable == "time.zone" {
1701            // we could introduce alias in OptionDefinition if this string matching thing grows
1702            format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1703        } else {
1704            // These values are what are used to make the information_schema table, so we just
1705            // check here, before actually planning or executing the query, if it would produce no
1706            // results, and error preemptively if it would (for a better UX)
1707            let is_valid_variable = self
1708                .context_provider
1709                .options()
1710                .entries()
1711                .iter()
1712                .any(|opt| opt.key == variable);
1713
1714            if !is_valid_variable {
1715                return plan_err!(
1716                    "'{variable}' is not a variable which can be viewed with 'SHOW'"
1717                );
1718            }
1719
1720            format!("{base_query} WHERE name = '{variable}'")
1721        };
1722
1723        let mut rewrite = DFParser::parse_sql(&query)?;
1724        assert_eq!(rewrite.len(), 1);
1725
1726        self.statement_to_plan(rewrite.pop_front().unwrap())
1727    }
1728
1729    fn set_variable_to_plan(
1730        &self,
1731        local: bool,
1732        hivevar: bool,
1733        variables: &OneOrManyWithParens<ObjectName>,
1734        value: Vec<SQLExpr>,
1735    ) -> Result<LogicalPlan> {
1736        if local {
1737            return not_impl_err!("LOCAL is not supported");
1738        }
1739
1740        if hivevar {
1741            return not_impl_err!("HIVEVAR is not supported");
1742        }
1743
1744        let variable = match variables {
1745            OneOrManyWithParens::One(v) => object_name_to_string(v),
1746            OneOrManyWithParens::Many(vs) => {
1747                return not_impl_err!(
1748                    "SET only supports single variable assignment: {vs:?}"
1749                );
1750            }
1751        };
1752        let mut variable_lower = variable.to_lowercase();
1753
1754        if variable_lower == "timezone" || variable_lower == "time.zone" {
1755            // We could introduce alias in OptionDefinition if this string matching thing grows
1756            variable_lower = "datafusion.execution.time_zone".to_string();
1757        }
1758
1759        // Parse value string from Expr
1760        let value_string = match &value[0] {
1761            SQLExpr::Identifier(i) => ident_to_string(i),
1762            SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
1763                None => {
1764                    return plan_err!("Unsupported Value {}", value[0]);
1765                }
1766                Some(v) => v,
1767            },
1768            // For capture signed number e.g. +8, -8
1769            SQLExpr::UnaryOp { op, expr } => match op {
1770                UnaryOperator::Plus => format!("+{expr}"),
1771                UnaryOperator::Minus => format!("-{expr}"),
1772                _ => {
1773                    return plan_err!("Unsupported Value {}", value[0]);
1774                }
1775            },
1776            _ => {
1777                return plan_err!("Unsupported Value {}", value[0]);
1778            }
1779        };
1780
1781        let statement = PlanStatement::SetVariable(SetVariable {
1782            variable: variable_lower,
1783            value: value_string,
1784        });
1785
1786        Ok(LogicalPlan::Statement(statement))
1787    }
1788
1789    fn delete_to_plan(
1790        &self,
1791        table_name: ObjectName,
1792        predicate_expr: Option<SQLExpr>,
1793    ) -> Result<LogicalPlan> {
1794        // Do a table lookup to verify the table exists
1795        let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1796        let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1797        let schema = table_source.schema().to_dfschema_ref()?;
1798        let scan =
1799            LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1800                .build()?;
1801        let mut planner_context = PlannerContext::new();
1802
1803        let source = match predicate_expr {
1804            None => scan,
1805            Some(predicate_expr) => {
1806                let filter_expr =
1807                    self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1808                let schema = Arc::new(schema);
1809                let mut using_columns = HashSet::new();
1810                expr_to_columns(&filter_expr, &mut using_columns)?;
1811                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1812                    filter_expr,
1813                    &[&[&schema]],
1814                    &[using_columns],
1815                )?;
1816                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1817            }
1818        };
1819
1820        let plan = LogicalPlan::Dml(DmlStatement::new(
1821            table_ref,
1822            table_source,
1823            WriteOp::Delete,
1824            Arc::new(source),
1825        ));
1826        Ok(plan)
1827    }
1828
1829    fn update_to_plan(
1830        &self,
1831        table: TableWithJoins,
1832        assignments: Vec<Assignment>,
1833        from: Option<TableWithJoins>,
1834        predicate_expr: Option<SQLExpr>,
1835    ) -> Result<LogicalPlan> {
1836        let (table_name, table_alias) = match &table.relation {
1837            TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
1838            _ => plan_err!("Cannot update non-table relation!")?,
1839        };
1840
1841        // Do a table lookup to verify the table exists
1842        let table_name = self.object_name_to_table_reference(table_name)?;
1843        let table_source = self.context_provider.get_table_source(table_name.clone())?;
1844        let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
1845            table_name.clone(),
1846            &table_source.schema(),
1847        )?);
1848
1849        // Overwrite with assignment expressions
1850        let mut planner_context = PlannerContext::new();
1851        let mut assign_map = assignments
1852            .iter()
1853            .map(|assign| {
1854                let cols = match &assign.target {
1855                    AssignmentTarget::ColumnName(cols) => cols,
1856                    _ => plan_err!("Tuples are not supported")?,
1857                };
1858                let col_name: &Ident = cols
1859                    .0
1860                    .iter()
1861                    .last()
1862                    .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
1863                    .as_ident()
1864                    .unwrap();
1865                // Validate that the assignment target column exists
1866                table_schema.field_with_unqualified_name(&col_name.value)?;
1867                Ok((col_name.value.clone(), assign.value.clone()))
1868            })
1869            .collect::<Result<HashMap<String, SQLExpr>>>()?;
1870
1871        // Build scan, join with from table if it exists.
1872        let mut input_tables = vec![table];
1873        input_tables.extend(from);
1874        let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
1875
1876        // Filter
1877        let source = match predicate_expr {
1878            None => scan,
1879            Some(predicate_expr) => {
1880                let filter_expr = self.sql_to_expr(
1881                    predicate_expr,
1882                    scan.schema(),
1883                    &mut planner_context,
1884                )?;
1885                let mut using_columns = HashSet::new();
1886                expr_to_columns(&filter_expr, &mut using_columns)?;
1887                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1888                    filter_expr,
1889                    &[&[scan.schema()]],
1890                    &[using_columns],
1891                )?;
1892                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1893            }
1894        };
1895
1896        // Build updated values for each column, using the previous value if not modified
1897        let exprs = table_schema
1898            .iter()
1899            .map(|(qualifier, field)| {
1900                let expr = match assign_map.remove(field.name()) {
1901                    Some(new_value) => {
1902                        let mut expr = self.sql_to_expr(
1903                            new_value,
1904                            source.schema(),
1905                            &mut planner_context,
1906                        )?;
1907                        // Update placeholder's datatype to the type of the target column
1908                        if let Expr::Placeholder(placeholder) = &mut expr {
1909                            placeholder.data_type = placeholder
1910                                .data_type
1911                                .take()
1912                                .or_else(|| Some(field.data_type().clone()));
1913                        }
1914                        // Cast to target column type, if necessary
1915                        expr.cast_to(field.data_type(), source.schema())?
1916                    }
1917                    None => {
1918                        // If the target table has an alias, use it to qualify the column name
1919                        if let Some(alias) = &table_alias {
1920                            Expr::Column(Column::new(
1921                                Some(self.ident_normalizer.normalize(alias.name.clone())),
1922                                field.name(),
1923                            ))
1924                        } else {
1925                            Expr::Column(Column::from((qualifier, field)))
1926                        }
1927                    }
1928                };
1929                Ok(expr.alias(field.name()))
1930            })
1931            .collect::<Result<Vec<_>>>()?;
1932
1933        let source = project(source, exprs)?;
1934
1935        let plan = LogicalPlan::Dml(DmlStatement::new(
1936            table_name,
1937            table_source,
1938            WriteOp::Update,
1939            Arc::new(source),
1940        ));
1941        Ok(plan)
1942    }
1943
1944    fn insert_to_plan(
1945        &self,
1946        table_name: ObjectName,
1947        columns: Vec<Ident>,
1948        source: Box<Query>,
1949        overwrite: bool,
1950        replace_into: bool,
1951    ) -> Result<LogicalPlan> {
1952        // Do a table lookup to verify the table exists
1953        let table_name = self.object_name_to_table_reference(table_name)?;
1954        let table_source = self.context_provider.get_table_source(table_name.clone())?;
1955        let arrow_schema = (*table_source.schema()).clone();
1956        let table_schema = DFSchema::try_from(arrow_schema)?;
1957
1958        // Get insert fields and target table's value indices
1959        //
1960        // If value_indices[i] = Some(j), it means that the value of the i-th target table's column is
1961        // derived from the j-th output of the source.
1962        //
1963        // If value_indices[i] = None, it means that the value of the i-th target table's column is
1964        // not provided, and should be filled with a default value later.
1965        let (fields, value_indices) = if columns.is_empty() {
1966            // Empty means we're inserting into all columns of the table
1967            (
1968                table_schema.fields().clone(),
1969                (0..table_schema.fields().len())
1970                    .map(Some)
1971                    .collect::<Vec<_>>(),
1972            )
1973        } else {
1974            let mut value_indices = vec![None; table_schema.fields().len()];
1975            let fields = columns
1976                .into_iter()
1977                .map(|c| self.ident_normalizer.normalize(c))
1978                .enumerate()
1979                .map(|(i, c)| {
1980                    let column_index = table_schema
1981                        .index_of_column_by_name(None, &c)
1982                        .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
1983
1984                    if value_indices[column_index].is_some() {
1985                        return schema_err!(SchemaError::DuplicateUnqualifiedField {
1986                            name: c,
1987                        });
1988                    } else {
1989                        value_indices[column_index] = Some(i);
1990                    }
1991                    Ok(table_schema.field(column_index).clone())
1992                })
1993                .collect::<Result<Vec<_>>>()?;
1994            (Fields::from(fields), value_indices)
1995        };
1996
1997        // infer types for Values clause... other types should be resolvable the regular way
1998        let mut prepare_param_data_types = BTreeMap::new();
1999        if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2000            for row in rows.iter() {
2001                for (idx, val) in row.iter().enumerate() {
2002                    if let SQLExpr::Value(ValueWithSpan {
2003                        value: Value::Placeholder(name),
2004                        span: _,
2005                    }) = val
2006                    {
2007                        let name =
2008                            name.replace('$', "").parse::<usize>().map_err(|_| {
2009                                plan_datafusion_err!("Can't parse placeholder: {name}")
2010                            })? - 1;
2011                        let field = fields.get(idx).ok_or_else(|| {
2012                            plan_datafusion_err!(
2013                                "Placeholder ${} refers to a non existent column",
2014                                idx + 1
2015                            )
2016                        })?;
2017                        let dt = field.data_type().clone();
2018                        let _ = prepare_param_data_types.insert(name, dt);
2019                    }
2020                }
2021            }
2022        }
2023        let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2024
2025        // Projection
2026        let mut planner_context =
2027            PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2028        planner_context.set_table_schema(Some(DFSchemaRef::new(
2029            DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2030        )));
2031        let source = self.query_to_plan(*source, &mut planner_context)?;
2032        if fields.len() != source.schema().fields().len() {
2033            plan_err!("Column count doesn't match insert query!")?;
2034        }
2035
2036        let exprs = value_indices
2037            .into_iter()
2038            .enumerate()
2039            .map(|(i, value_index)| {
2040                let target_field = table_schema.field(i);
2041                let expr = match value_index {
2042                    Some(v) => {
2043                        Expr::Column(Column::from(source.schema().qualified_field(v)))
2044                            .cast_to(target_field.data_type(), source.schema())?
2045                    }
2046                    // The value is not specified. Fill in the default value for the column.
2047                    None => table_source
2048                        .get_column_default(target_field.name())
2049                        .cloned()
2050                        .unwrap_or_else(|| {
2051                            // If there is no default for the column, then the default is NULL
2052                            Expr::Literal(ScalarValue::Null)
2053                        })
2054                        .cast_to(target_field.data_type(), &DFSchema::empty())?,
2055                };
2056                Ok(expr.alias(target_field.name()))
2057            })
2058            .collect::<Result<Vec<Expr>>>()?;
2059        let source = project(source, exprs)?;
2060
2061        let insert_op = match (overwrite, replace_into) {
2062            (false, false) => InsertOp::Append,
2063            (true, false) => InsertOp::Overwrite,
2064            (false, true) => InsertOp::Replace,
2065            (true, true) => plan_err!("Conflicting insert operations: `overwrite` and `replace_into` cannot both be true")?,
2066        };
2067
2068        let plan = LogicalPlan::Dml(DmlStatement::new(
2069            table_name,
2070            Arc::clone(&table_source),
2071            WriteOp::Insert(insert_op),
2072            Arc::new(source),
2073        ));
2074        Ok(plan)
2075    }
2076
2077    fn show_columns_to_plan(
2078        &self,
2079        extended: bool,
2080        full: bool,
2081        sql_table_name: ObjectName,
2082    ) -> Result<LogicalPlan> {
2083        // Figure out the where clause
2084        let where_clause = object_name_to_qualifier(
2085            &sql_table_name,
2086            self.options.enable_ident_normalization,
2087        )?;
2088
2089        if !self.has_table("information_schema", "columns") {
2090            return plan_err!(
2091                "SHOW COLUMNS is not supported unless information_schema is enabled"
2092            );
2093        }
2094
2095        // Do a table lookup to verify the table exists
2096        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2097        let _ = self.context_provider.get_table_source(table_ref)?;
2098
2099        // Treat both FULL and EXTENDED as the same
2100        let select_list = if full || extended {
2101            "*"
2102        } else {
2103            "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2104        };
2105
2106        let query = format!(
2107            "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2108        );
2109
2110        let mut rewrite = DFParser::parse_sql(&query)?;
2111        assert_eq!(rewrite.len(), 1);
2112        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2113    }
2114
2115    /// Rewrite `SHOW FUNCTIONS` to another SQL query
2116    /// The query is based on the `information_schema.routines` and `information_schema.parameters` tables
2117    ///
2118    /// The output columns:
2119    /// - function_name: The name of function
2120    /// - return_type: The return type of the function
2121    /// - parameters: The name of parameters (ordered by the ordinal position)
2122    /// - parameter_types: The type of parameters (ordered by the ordinal position)
2123    /// - description: The description of the function (the description defined in the document)
2124    /// - syntax_example: The syntax_example of the function (the syntax_example defined in the document)
2125    fn show_functions_to_plan(
2126        &self,
2127        filter: Option<ShowStatementFilter>,
2128    ) -> Result<LogicalPlan> {
2129        let where_clause = if let Some(filter) = filter {
2130            match filter {
2131                ShowStatementFilter::Like(like) => {
2132                    format!("WHERE p.function_name like '{like}'")
2133                }
2134                _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2135            }
2136        } else {
2137            "".to_string()
2138        };
2139
2140        let query = format!(
2141            r#"
2142SELECT DISTINCT
2143    p.*,
2144    r.function_type function_type,
2145    r.description description,
2146    r.syntax_example syntax_example
2147FROM
2148    (
2149        SELECT
2150            i.specific_name function_name,
2151            o.data_type return_type,
2152            array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2153            array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2154        FROM (
2155                 SELECT
2156                     specific_catalog,
2157                     specific_schema,
2158                     specific_name,
2159                     ordinal_position,
2160                     parameter_name,
2161                     data_type,
2162                     rid
2163                 FROM
2164                     information_schema.parameters
2165                 WHERE
2166                     parameter_mode = 'IN'
2167             ) i
2168                 JOIN
2169             (
2170                 SELECT
2171                     specific_catalog,
2172                     specific_schema,
2173                     specific_name,
2174                     ordinal_position,
2175                     parameter_name,
2176                     data_type,
2177                     rid
2178                 FROM
2179                     information_schema.parameters
2180                 WHERE
2181                     parameter_mode = 'OUT'
2182             ) o
2183             ON i.specific_catalog = o.specific_catalog
2184                 AND i.specific_schema = o.specific_schema
2185                 AND i.specific_name = o.specific_name
2186                 AND i.rid = o.rid
2187        GROUP BY 1, 2, i.rid
2188    ) as p
2189JOIN information_schema.routines r
2190ON p.function_name = r.routine_name
2191{where_clause}
2192            "#
2193        );
2194        let mut rewrite = DFParser::parse_sql(&query)?;
2195        assert_eq!(rewrite.len(), 1);
2196        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2197    }
2198
2199    fn show_create_table_to_plan(
2200        &self,
2201        sql_table_name: ObjectName,
2202    ) -> Result<LogicalPlan> {
2203        if !self.has_table("information_schema", "tables") {
2204            return plan_err!(
2205                "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2206            );
2207        }
2208        // Figure out the where clause
2209        let where_clause = object_name_to_qualifier(
2210            &sql_table_name,
2211            self.options.enable_ident_normalization,
2212        )?;
2213
2214        // Do a table lookup to verify the table exists
2215        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2216        let _ = self.context_provider.get_table_source(table_ref)?;
2217
2218        let query = format!(
2219            "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2220        );
2221
2222        let mut rewrite = DFParser::parse_sql(&query)?;
2223        assert_eq!(rewrite.len(), 1);
2224        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2225    }
2226
2227    /// Return true if there is a table provider available for "schema.table"
2228    fn has_table(&self, schema: &str, table: &str) -> bool {
2229        let tables_reference = TableReference::Partial {
2230            schema: schema.into(),
2231            table: table.into(),
2232        };
2233        self.context_provider
2234            .get_table_source(tables_reference)
2235            .is_ok()
2236    }
2237
2238    fn validate_transaction_kind(
2239        &self,
2240        kind: Option<BeginTransactionKind>,
2241    ) -> Result<()> {
2242        match kind {
2243            // BEGIN
2244            None => Ok(()),
2245            // BEGIN TRANSACTION
2246            Some(BeginTransactionKind::Transaction) => Ok(()),
2247            Some(BeginTransactionKind::Work) => {
2248                not_impl_err!("Transaction kind not supported: {kind:?}")
2249            }
2250        }
2251    }
2252}