datafusion_sql/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24    CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25    LexOrdering, Statement as DFStatement,
26};
27use crate::planner::{
28    object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{DataType, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36    exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
37    unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
38    DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
39    ToDFSchema,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::builder::project;
44use datafusion_expr::logical_plan::DdlStatement;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47    cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
48    CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49    CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50    DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51    EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52    LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable,
53    SortExpr, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
54    TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
55    Volatility, WriteOp,
56};
57use sqlparser::ast::{
58    self, BeginTransactionKind, NullsDistinctOption, ShowStatementIn,
59    ShowStatementOptions, SqliteOnConflict, TableObject, UpdateTableFromKind,
60    ValueWithSpan,
61};
62use sqlparser::ast::{
63    Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
64    CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
65    ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr,
66    ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor,
67    TableWithJoins, TransactionMode, UnaryOperator, Value,
68};
69use sqlparser::parser::ParserError::ParserError;
70
71fn ident_to_string(ident: &Ident) -> String {
72    normalize_ident(ident.to_owned())
73}
74
75fn object_name_to_string(object_name: &ObjectName) -> String {
76    object_name
77        .0
78        .iter()
79        .map(|object_name_part| {
80            object_name_part
81                .as_ident()
82                // TODO: It might be better to return an error
83                // than to silently use a default value.
84                .map_or_else(String::new, ident_to_string)
85        })
86        .collect::<Vec<String>>()
87        .join(".")
88}
89
90fn get_schema_name(schema_name: &SchemaName) -> String {
91    match schema_name {
92        SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
93        SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
94        SchemaName::NamedAuthorization(schema_name, auth) => format!(
95            "{}.{}",
96            object_name_to_string(schema_name),
97            ident_to_string(auth)
98        ),
99    }
100}
101
102/// Construct `TableConstraint`(s) for the given columns by iterating over
103/// `columns` and extracting individual inline constraint definitions.
104fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
105    let mut constraints = vec![];
106    for column in columns {
107        for ast::ColumnOptionDef { name, option } in &column.options {
108            match option {
109                ast::ColumnOption::Unique {
110                    is_primary: false,
111                    characteristics,
112                } => constraints.push(TableConstraint::Unique {
113                    name: name.clone(),
114                    columns: vec![column.name.clone()],
115                    characteristics: *characteristics,
116                    index_name: None,
117                    index_type_display: ast::KeyOrIndexDisplay::None,
118                    index_type: None,
119                    index_options: vec![],
120                    nulls_distinct: NullsDistinctOption::None,
121                }),
122                ast::ColumnOption::Unique {
123                    is_primary: true,
124                    characteristics,
125                } => constraints.push(TableConstraint::PrimaryKey {
126                    name: name.clone(),
127                    columns: vec![column.name.clone()],
128                    characteristics: *characteristics,
129                    index_name: None,
130                    index_type: None,
131                    index_options: vec![],
132                }),
133                ast::ColumnOption::ForeignKey {
134                    foreign_table,
135                    referred_columns,
136                    on_delete,
137                    on_update,
138                    characteristics,
139                } => constraints.push(TableConstraint::ForeignKey {
140                    name: name.clone(),
141                    columns: vec![],
142                    foreign_table: foreign_table.clone(),
143                    referred_columns: referred_columns.to_vec(),
144                    on_delete: *on_delete,
145                    on_update: *on_update,
146                    characteristics: *characteristics,
147                }),
148                ast::ColumnOption::Check(expr) => {
149                    constraints.push(TableConstraint::Check {
150                        name: name.clone(),
151                        expr: Box::new(expr.clone()),
152                    })
153                }
154                // Other options are not constraint related.
155                ast::ColumnOption::Default(_)
156                | ast::ColumnOption::Null
157                | ast::ColumnOption::NotNull
158                | ast::ColumnOption::DialectSpecific(_)
159                | ast::ColumnOption::CharacterSet(_)
160                | ast::ColumnOption::Generated { .. }
161                | ast::ColumnOption::Comment(_)
162                | ast::ColumnOption::Options(_)
163                | ast::ColumnOption::OnUpdate(_)
164                | ast::ColumnOption::Materialized(_)
165                | ast::ColumnOption::Ephemeral(_)
166                | ast::ColumnOption::Identity(_)
167                | ast::ColumnOption::OnConflict(_)
168                | ast::ColumnOption::Policy(_)
169                | ast::ColumnOption::Tags(_)
170                | ast::ColumnOption::Alias(_)
171                | ast::ColumnOption::Collation(_) => {}
172            }
173        }
174    }
175    constraints
176}
177
178impl<S: ContextProvider> SqlToRel<'_, S> {
179    /// Generate a logical plan from an DataFusion SQL statement
180    pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
181        match statement {
182            DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
183            DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
184            DFStatement::CopyTo(s) => self.copy_to_plan(s),
185            DFStatement::Explain(ExplainStatement {
186                verbose,
187                analyze,
188                format,
189                statement,
190            }) => self.explain_to_plan(verbose, analyze, format, *statement),
191        }
192    }
193
194    /// Generate a logical plan from an SQL statement
195    pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
196        self.sql_statement_to_plan_with_context_impl(
197            statement,
198            &mut PlannerContext::new(),
199        )
200    }
201
202    /// Generate a logical plan from an SQL statement
203    pub fn sql_statement_to_plan_with_context(
204        &self,
205        statement: Statement,
206        planner_context: &mut PlannerContext,
207    ) -> Result<LogicalPlan> {
208        self.sql_statement_to_plan_with_context_impl(statement, planner_context)
209    }
210
211    fn sql_statement_to_plan_with_context_impl(
212        &self,
213        statement: Statement,
214        planner_context: &mut PlannerContext,
215    ) -> Result<LogicalPlan> {
216        match statement {
217            Statement::ExplainTable {
218                describe_alias: DescribeAlias::Describe, // only parse 'DESCRIBE table_name' and not 'EXPLAIN table_name'
219                table_name,
220                ..
221            } => self.describe_table_to_plan(table_name),
222            Statement::Explain {
223                verbose,
224                statement,
225                analyze,
226                format,
227                describe_alias: _,
228                ..
229            } => {
230                let format = format.map(|format| format.to_string());
231                let statement = DFStatement::Statement(statement);
232                self.explain_to_plan(verbose, analyze, format, statement)
233            }
234            Statement::Query(query) => self.query_to_plan(*query, planner_context),
235            Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
236            Statement::SetVariable {
237                local,
238                hivevar,
239                variables,
240                value,
241            } => self.set_variable_to_plan(local, hivevar, &variables, value),
242
243            Statement::CreateTable(CreateTable {
244                temporary,
245                external,
246                global,
247                transient,
248                volatile,
249                hive_distribution,
250                hive_formats,
251                file_format,
252                location,
253                query,
254                name,
255                columns,
256                constraints,
257                table_properties,
258                with_options,
259                if_not_exists,
260                or_replace,
261                without_rowid,
262                like,
263                clone,
264                engine,
265                comment,
266                auto_increment_offset,
267                default_charset,
268                collation,
269                on_commit,
270                on_cluster,
271                primary_key,
272                order_by,
273                partition_by,
274                cluster_by,
275                clustered_by,
276                options,
277                strict,
278                copy_grants,
279                enable_schema_evolution,
280                change_tracking,
281                data_retention_time_in_days,
282                max_data_extension_time_in_days,
283                default_ddl_collation,
284                with_aggregation_policy,
285                with_row_access_policy,
286                with_tags,
287                iceberg,
288                external_volume,
289                base_location,
290                catalog,
291                catalog_sync,
292                storage_serialization_policy,
293            }) if table_properties.is_empty() && with_options.is_empty() => {
294                if temporary {
295                    return not_impl_err!("Temporary tables not supported")?;
296                }
297                if external {
298                    return not_impl_err!("External tables not supported")?;
299                }
300                if global.is_some() {
301                    return not_impl_err!("Global tables not supported")?;
302                }
303                if transient {
304                    return not_impl_err!("Transient tables not supported")?;
305                }
306                if volatile {
307                    return not_impl_err!("Volatile tables not supported")?;
308                }
309                if hive_distribution != ast::HiveDistributionStyle::NONE {
310                    return not_impl_err!(
311                        "Hive distribution not supported: {hive_distribution:?}"
312                    )?;
313                }
314                if !matches!(
315                    hive_formats,
316                    Some(ast::HiveFormat {
317                        row_format: None,
318                        serde_properties: None,
319                        storage: None,
320                        location: None,
321                    })
322                ) {
323                    return not_impl_err!(
324                        "Hive formats not supported: {hive_formats:?}"
325                    )?;
326                }
327                if file_format.is_some() {
328                    return not_impl_err!("File format not supported")?;
329                }
330                if location.is_some() {
331                    return not_impl_err!("Location not supported")?;
332                }
333                if without_rowid {
334                    return not_impl_err!("Without rowid not supported")?;
335                }
336                if like.is_some() {
337                    return not_impl_err!("Like not supported")?;
338                }
339                if clone.is_some() {
340                    return not_impl_err!("Clone not supported")?;
341                }
342                if engine.is_some() {
343                    return not_impl_err!("Engine not supported")?;
344                }
345                if comment.is_some() {
346                    return not_impl_err!("Comment not supported")?;
347                }
348                if auto_increment_offset.is_some() {
349                    return not_impl_err!("Auto increment offset not supported")?;
350                }
351                if default_charset.is_some() {
352                    return not_impl_err!("Default charset not supported")?;
353                }
354                if collation.is_some() {
355                    return not_impl_err!("Collation not supported")?;
356                }
357                if on_commit.is_some() {
358                    return not_impl_err!("On commit not supported")?;
359                }
360                if on_cluster.is_some() {
361                    return not_impl_err!("On cluster not supported")?;
362                }
363                if primary_key.is_some() {
364                    return not_impl_err!("Primary key not supported")?;
365                }
366                if order_by.is_some() {
367                    return not_impl_err!("Order by not supported")?;
368                }
369                if partition_by.is_some() {
370                    return not_impl_err!("Partition by not supported")?;
371                }
372                if cluster_by.is_some() {
373                    return not_impl_err!("Cluster by not supported")?;
374                }
375                if clustered_by.is_some() {
376                    return not_impl_err!("Clustered by not supported")?;
377                }
378                if options.is_some() {
379                    return not_impl_err!("Options not supported")?;
380                }
381                if strict {
382                    return not_impl_err!("Strict not supported")?;
383                }
384                if copy_grants {
385                    return not_impl_err!("Copy grants not supported")?;
386                }
387                if enable_schema_evolution.is_some() {
388                    return not_impl_err!("Enable schema evolution not supported")?;
389                }
390                if change_tracking.is_some() {
391                    return not_impl_err!("Change tracking not supported")?;
392                }
393                if data_retention_time_in_days.is_some() {
394                    return not_impl_err!("Data retention time in days not supported")?;
395                }
396                if max_data_extension_time_in_days.is_some() {
397                    return not_impl_err!(
398                        "Max data extension time in days not supported"
399                    )?;
400                }
401                if default_ddl_collation.is_some() {
402                    return not_impl_err!("Default DDL collation not supported")?;
403                }
404                if with_aggregation_policy.is_some() {
405                    return not_impl_err!("With aggregation policy not supported")?;
406                }
407                if with_row_access_policy.is_some() {
408                    return not_impl_err!("With row access policy not supported")?;
409                }
410                if with_tags.is_some() {
411                    return not_impl_err!("With tags not supported")?;
412                }
413                if iceberg {
414                    return not_impl_err!("Iceberg not supported")?;
415                }
416                if external_volume.is_some() {
417                    return not_impl_err!("External volume not supported")?;
418                }
419                if base_location.is_some() {
420                    return not_impl_err!("Base location not supported")?;
421                }
422                if catalog.is_some() {
423                    return not_impl_err!("Catalog not supported")?;
424                }
425                if catalog_sync.is_some() {
426                    return not_impl_err!("Catalog sync not supported")?;
427                }
428                if storage_serialization_policy.is_some() {
429                    return not_impl_err!("Storage serialization policy not supported")?;
430                }
431
432                // Merge inline constraints and existing constraints
433                let mut all_constraints = constraints;
434                let inline_constraints = calc_inline_constraints_from_columns(&columns);
435                all_constraints.extend(inline_constraints);
436                // Build column default values
437                let column_defaults =
438                    self.build_column_defaults(&columns, planner_context)?;
439
440                let has_columns = !columns.is_empty();
441                let schema = self.build_schema(columns)?.to_dfschema_ref()?;
442                if has_columns {
443                    planner_context.set_table_schema(Some(Arc::clone(&schema)));
444                }
445
446                match query {
447                    Some(query) => {
448                        let plan = self.query_to_plan(*query, planner_context)?;
449                        let input_schema = plan.schema();
450
451                        let plan = if has_columns {
452                            if schema.fields().len() != input_schema.fields().len() {
453                                return plan_err!(
454                            "Mismatch: {} columns specified, but result has {} columns",
455                            schema.fields().len(),
456                            input_schema.fields().len()
457                        );
458                            }
459                            let input_fields = input_schema.fields();
460                            let project_exprs = schema
461                                .fields()
462                                .iter()
463                                .zip(input_fields)
464                                .map(|(field, input_field)| {
465                                    cast(
466                                        col(input_field.name()),
467                                        field.data_type().clone(),
468                                    )
469                                    .alias(field.name())
470                                })
471                                .collect::<Vec<_>>();
472
473                            LogicalPlanBuilder::from(plan.clone())
474                                .project(project_exprs)?
475                                .build()?
476                        } else {
477                            plan
478                        };
479
480                        let constraints = self.new_constraint_from_table_constraints(
481                            &all_constraints,
482                            plan.schema(),
483                        )?;
484
485                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
486                            CreateMemoryTable {
487                                name: self.object_name_to_table_reference(name)?,
488                                constraints,
489                                input: Arc::new(plan),
490                                if_not_exists,
491                                or_replace,
492                                column_defaults,
493                                temporary,
494                            },
495                        )))
496                    }
497
498                    None => {
499                        let plan = EmptyRelation {
500                            produce_one_row: false,
501                            schema,
502                        };
503                        let plan = LogicalPlan::EmptyRelation(plan);
504                        let constraints = self.new_constraint_from_table_constraints(
505                            &all_constraints,
506                            plan.schema(),
507                        )?;
508                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
509                            CreateMemoryTable {
510                                name: self.object_name_to_table_reference(name)?,
511                                constraints,
512                                input: Arc::new(plan),
513                                if_not_exists,
514                                or_replace,
515                                column_defaults,
516                                temporary,
517                            },
518                        )))
519                    }
520                }
521            }
522
523            Statement::CreateView {
524                or_replace,
525                materialized,
526                name,
527                columns,
528                query,
529                options: CreateTableOptions::None,
530                cluster_by,
531                comment,
532                with_no_schema_binding,
533                if_not_exists,
534                temporary,
535                to,
536                params,
537            } => {
538                if materialized {
539                    return not_impl_err!("Materialized views not supported")?;
540                }
541                if !cluster_by.is_empty() {
542                    return not_impl_err!("Cluster by not supported")?;
543                }
544                if comment.is_some() {
545                    return not_impl_err!("Comment not supported")?;
546                }
547                if with_no_schema_binding {
548                    return not_impl_err!("With no schema binding not supported")?;
549                }
550                if if_not_exists {
551                    return not_impl_err!("If not exists not supported")?;
552                }
553                if to.is_some() {
554                    return not_impl_err!("To not supported")?;
555                }
556
557                // put the statement back together temporarily to get the SQL
558                // string representation
559                let stmt = Statement::CreateView {
560                    or_replace,
561                    materialized,
562                    name,
563                    columns,
564                    query,
565                    options: CreateTableOptions::None,
566                    cluster_by,
567                    comment,
568                    with_no_schema_binding,
569                    if_not_exists,
570                    temporary,
571                    to,
572                    params,
573                };
574                let sql = stmt.to_string();
575                let Statement::CreateView {
576                    name,
577                    columns,
578                    query,
579                    or_replace,
580                    temporary,
581                    ..
582                } = stmt
583                else {
584                    return internal_err!("Unreachable code in create view");
585                };
586
587                let columns = columns
588                    .into_iter()
589                    .map(|view_column_def| {
590                        if let Some(options) = view_column_def.options {
591                            plan_err!(
592                                "Options not supported for view columns: {options:?}"
593                            )
594                        } else {
595                            Ok(view_column_def.name)
596                        }
597                    })
598                    .collect::<Result<Vec<_>>>()?;
599
600                let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
601                plan = self.apply_expr_alias(plan, columns)?;
602
603                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
604                    name: self.object_name_to_table_reference(name)?,
605                    input: Arc::new(plan),
606                    or_replace,
607                    definition: Some(sql),
608                    temporary,
609                })))
610            }
611            Statement::ShowCreate { obj_type, obj_name } => match obj_type {
612                ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
613                _ => {
614                    not_impl_err!("Only `SHOW CREATE TABLE  ...` statement is supported")
615                }
616            },
617            Statement::CreateSchema {
618                schema_name,
619                if_not_exists,
620            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
621                CreateCatalogSchema {
622                    schema_name: get_schema_name(&schema_name),
623                    if_not_exists,
624                    schema: Arc::new(DFSchema::empty()),
625                },
626            ))),
627            Statement::CreateDatabase {
628                db_name,
629                if_not_exists,
630                ..
631            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
632                CreateCatalog {
633                    catalog_name: object_name_to_string(&db_name),
634                    if_not_exists,
635                    schema: Arc::new(DFSchema::empty()),
636                },
637            ))),
638            Statement::Drop {
639                object_type,
640                if_exists,
641                mut names,
642                cascade,
643                restrict: _,
644                purge: _,
645                temporary: _,
646            } => {
647                // We don't support cascade and purge for now.
648                // nor do we support multiple object names
649                let name = match names.len() {
650                    0 => Err(ParserError("Missing table name.".to_string()).into()),
651                    1 => self.object_name_to_table_reference(names.pop().unwrap()),
652                    _ => {
653                        Err(ParserError("Multiple objects not supported".to_string())
654                            .into())
655                    }
656                }?;
657
658                match object_type {
659                    ObjectType::Table => {
660                        Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
661                            name,
662                            if_exists,
663                            schema: DFSchemaRef::new(DFSchema::empty()),
664                        })))
665                    }
666                    ObjectType::View => {
667                        Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
668                            name,
669                            if_exists,
670                            schema: DFSchemaRef::new(DFSchema::empty()),
671                        })))
672                    }
673                    ObjectType::Schema => {
674                        let name = match name {
675                            TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
676                            TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
677                            TableReference::Full { catalog: _, schema: _, table: _ } => {
678                                Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
679                            }
680                        }?;
681                        Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
682                            name,
683                            if_exists,
684                            cascade,
685                            schema: DFSchemaRef::new(DFSchema::empty()),
686                        })))
687                    }
688                    _ => not_impl_err!(
689                        "Only `DROP TABLE/VIEW/SCHEMA  ...` statement is supported currently"
690                    ),
691                }
692            }
693            Statement::Prepare {
694                name,
695                data_types,
696                statement,
697            } => {
698                // Convert parser data types to DataFusion data types
699                let mut data_types: Vec<DataType> = data_types
700                    .into_iter()
701                    .map(|t| self.convert_data_type(&t))
702                    .collect::<Result<_>>()?;
703
704                // Create planner context with parameters
705                let mut planner_context = PlannerContext::new()
706                    .with_prepare_param_data_types(data_types.clone());
707
708                // Build logical plan for inner statement of the prepare statement
709                let plan = self.sql_statement_to_plan_with_context_impl(
710                    *statement,
711                    &mut planner_context,
712                )?;
713
714                if data_types.is_empty() {
715                    let map_types = plan.get_parameter_types()?;
716                    let param_types: Vec<_> = (1..=map_types.len())
717                        .filter_map(|i| {
718                            let key = format!("${i}");
719                            map_types.get(&key).and_then(|opt| opt.clone())
720                        })
721                        .collect();
722                    data_types.extend(param_types.iter().cloned());
723                    planner_context.with_prepare_param_data_types(param_types);
724                }
725
726                Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
727                    name: ident_to_string(&name),
728                    data_types,
729                    input: Arc::new(plan),
730                })))
731            }
732            Statement::Execute {
733                name,
734                parameters,
735                using,
736                // has_parentheses specifies the syntax, but the plan is the
737                // same no matter the syntax used, so ignore it
738                has_parentheses: _,
739                immediate,
740                into,
741            } => {
742                // `USING` is a MySQL-specific syntax and currently not supported.
743                if !using.is_empty() {
744                    return not_impl_err!(
745                        "Execute statement with USING is not supported"
746                    );
747                }
748                if immediate {
749                    return not_impl_err!(
750                        "Execute statement with IMMEDIATE is not supported"
751                    );
752                }
753                if !into.is_empty() {
754                    return not_impl_err!("Execute statement with INTO is not supported");
755                }
756                let empty_schema = DFSchema::empty();
757                let parameters = parameters
758                    .into_iter()
759                    .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
760                    .collect::<Result<Vec<Expr>>>()?;
761
762                Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
763                    name: object_name_to_string(&name.unwrap()),
764                    parameters,
765                })))
766            }
767            Statement::Deallocate {
768                name,
769                // Similar to PostgreSQL, the PREPARE keyword is ignored
770                prepare: _,
771            } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
772                Deallocate {
773                    name: ident_to_string(&name),
774                },
775            ))),
776
777            Statement::ShowTables {
778                extended,
779                full,
780                terse,
781                history,
782                external,
783                show_options,
784            } => {
785                // We only support the basic "SHOW TABLES"
786                // https://github.com/apache/datafusion/issues/3188
787                if extended {
788                    return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
789                }
790                if full {
791                    return not_impl_err!("SHOW FULL TABLES not supported")?;
792                }
793                if terse {
794                    return not_impl_err!("SHOW TERSE TABLES not supported")?;
795                }
796                if history {
797                    return not_impl_err!("SHOW TABLES HISTORY not supported")?;
798                }
799                if external {
800                    return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
801                }
802                let ShowStatementOptions {
803                    show_in,
804                    starts_with,
805                    limit,
806                    limit_from,
807                    filter_position,
808                } = show_options;
809                if show_in.is_some() {
810                    return not_impl_err!("SHOW TABLES IN not supported")?;
811                }
812                if starts_with.is_some() {
813                    return not_impl_err!("SHOW TABLES LIKE not supported")?;
814                }
815                if limit.is_some() {
816                    return not_impl_err!("SHOW TABLES LIMIT not supported")?;
817                }
818                if limit_from.is_some() {
819                    return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
820                }
821                if filter_position.is_some() {
822                    return not_impl_err!("SHOW TABLES FILTER not supported")?;
823                }
824                self.show_tables_to_plan()
825            }
826
827            Statement::ShowColumns {
828                extended,
829                full,
830                show_options,
831            } => {
832                let ShowStatementOptions {
833                    show_in,
834                    starts_with,
835                    limit,
836                    limit_from,
837                    filter_position,
838                } = show_options;
839                if starts_with.is_some() {
840                    return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
841                }
842                if limit.is_some() {
843                    return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
844                }
845                if limit_from.is_some() {
846                    return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
847                }
848                if filter_position.is_some() {
849                    return not_impl_err!(
850                        "SHOW COLUMNS with WHERE or LIKE is not supported"
851                    )?;
852                }
853                let Some(ShowStatementIn {
854                    // specifies if the syntax was `SHOW COLUMNS IN` or `SHOW
855                    // COLUMNS FROM` which is not different in DataFusion
856                    clause: _,
857                    parent_type,
858                    parent_name,
859                }) = show_in
860                else {
861                    return plan_err!("SHOW COLUMNS requires a table name");
862                };
863
864                if let Some(parent_type) = parent_type {
865                    return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
866                }
867                let Some(table_name) = parent_name else {
868                    return plan_err!("SHOW COLUMNS requires a table name");
869                };
870
871                self.show_columns_to_plan(extended, full, table_name)
872            }
873
874            Statement::ShowFunctions { filter, .. } => {
875                self.show_functions_to_plan(filter)
876            }
877
878            Statement::Insert(Insert {
879                or,
880                into,
881                columns,
882                overwrite,
883                source,
884                partitioned,
885                after_columns,
886                table,
887                on,
888                returning,
889                ignore,
890                table_alias,
891                mut replace_into,
892                priority,
893                insert_alias,
894                assignments,
895                has_table_keyword,
896                settings,
897                format_clause,
898            }) => {
899                let table_name = match table {
900                    TableObject::TableName(table_name) => table_name,
901                    TableObject::TableFunction(_) => {
902                        return not_impl_err!("INSERT INTO Table functions not supported")
903                    }
904                };
905                if let Some(or) = or {
906                    match or {
907                        SqliteOnConflict::Replace => replace_into = true,
908                        _ => plan_err!("Inserts with {or} clause is not supported")?,
909                    }
910                }
911                if partitioned.is_some() {
912                    plan_err!("Partitioned inserts not yet supported")?;
913                }
914                if !after_columns.is_empty() {
915                    plan_err!("After-columns clause not supported")?;
916                }
917                if on.is_some() {
918                    plan_err!("Insert-on clause not supported")?;
919                }
920                if returning.is_some() {
921                    plan_err!("Insert-returning clause not supported")?;
922                }
923                if ignore {
924                    plan_err!("Insert-ignore clause not supported")?;
925                }
926                let Some(source) = source else {
927                    plan_err!("Inserts without a source not supported")?
928                };
929                if let Some(table_alias) = table_alias {
930                    plan_err!(
931                        "Inserts with a table alias not supported: {table_alias:?}"
932                    )?
933                };
934                if let Some(priority) = priority {
935                    plan_err!(
936                        "Inserts with a `PRIORITY` clause not supported: {priority:?}"
937                    )?
938                };
939                if insert_alias.is_some() {
940                    plan_err!("Inserts with an alias not supported")?;
941                }
942                if !assignments.is_empty() {
943                    plan_err!("Inserts with assignments not supported")?;
944                }
945                if settings.is_some() {
946                    plan_err!("Inserts with settings not supported")?;
947                }
948                if format_clause.is_some() {
949                    plan_err!("Inserts with format clause not supported")?;
950                }
951                // optional keywords don't change behavior
952                let _ = into;
953                let _ = has_table_keyword;
954                self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
955            }
956            Statement::Update {
957                table,
958                assignments,
959                from,
960                selection,
961                returning,
962                or,
963            } => {
964                let froms =
965                    from.map(|update_table_from_kind| match update_table_from_kind {
966                        UpdateTableFromKind::BeforeSet(froms) => froms,
967                        UpdateTableFromKind::AfterSet(froms) => froms,
968                    });
969                // TODO: support multiple tables in UPDATE SET FROM
970                if froms.as_ref().is_some_and(|f| f.len() > 1) {
971                    plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
972                }
973                let update_from = froms.and_then(|mut f| f.pop());
974                if returning.is_some() {
975                    plan_err!("Update-returning clause not yet supported")?;
976                }
977                if or.is_some() {
978                    plan_err!("ON conflict not supported")?;
979                }
980                self.update_to_plan(table, assignments, update_from, selection)
981            }
982
983            Statement::Delete(Delete {
984                tables,
985                using,
986                selection,
987                returning,
988                from,
989                order_by,
990                limit,
991            }) => {
992                if !tables.is_empty() {
993                    plan_err!("DELETE <TABLE> not supported")?;
994                }
995
996                if using.is_some() {
997                    plan_err!("Using clause not supported")?;
998                }
999
1000                if returning.is_some() {
1001                    plan_err!("Delete-returning clause not yet supported")?;
1002                }
1003
1004                if !order_by.is_empty() {
1005                    plan_err!("Delete-order-by clause not yet supported")?;
1006                }
1007
1008                if limit.is_some() {
1009                    plan_err!("Delete-limit clause not yet supported")?;
1010                }
1011
1012                let table_name = self.get_delete_target(from)?;
1013                self.delete_to_plan(table_name, selection)
1014            }
1015
1016            Statement::StartTransaction {
1017                modes,
1018                begin: false,
1019                modifier,
1020                transaction,
1021                statements,
1022                exception_statements,
1023                has_end_keyword,
1024            } => {
1025                if let Some(modifier) = modifier {
1026                    return not_impl_err!(
1027                        "Transaction modifier not supported: {modifier}"
1028                    );
1029                }
1030                if !statements.is_empty() {
1031                    return not_impl_err!(
1032                        "Transaction with multiple statements not supported"
1033                    );
1034                }
1035                if exception_statements.is_some() {
1036                    return not_impl_err!(
1037                        "Transaction with exception statements not supported"
1038                    );
1039                }
1040                if has_end_keyword {
1041                    return not_impl_err!("Transaction with END keyword not supported");
1042                }
1043                self.validate_transaction_kind(transaction)?;
1044                let isolation_level: ast::TransactionIsolationLevel = modes
1045                    .iter()
1046                    .filter_map(|m: &TransactionMode| match m {
1047                        TransactionMode::AccessMode(_) => None,
1048                        TransactionMode::IsolationLevel(level) => Some(level),
1049                    })
1050                    .next_back()
1051                    .copied()
1052                    .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1053                let access_mode: ast::TransactionAccessMode = modes
1054                    .iter()
1055                    .filter_map(|m: &TransactionMode| match m {
1056                        TransactionMode::AccessMode(mode) => Some(mode),
1057                        TransactionMode::IsolationLevel(_) => None,
1058                    })
1059                    .next_back()
1060                    .copied()
1061                    .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1062                let isolation_level = match isolation_level {
1063                    ast::TransactionIsolationLevel::ReadUncommitted => {
1064                        TransactionIsolationLevel::ReadUncommitted
1065                    }
1066                    ast::TransactionIsolationLevel::ReadCommitted => {
1067                        TransactionIsolationLevel::ReadCommitted
1068                    }
1069                    ast::TransactionIsolationLevel::RepeatableRead => {
1070                        TransactionIsolationLevel::RepeatableRead
1071                    }
1072                    ast::TransactionIsolationLevel::Serializable => {
1073                        TransactionIsolationLevel::Serializable
1074                    }
1075                    ast::TransactionIsolationLevel::Snapshot => {
1076                        TransactionIsolationLevel::Snapshot
1077                    }
1078                };
1079                let access_mode = match access_mode {
1080                    ast::TransactionAccessMode::ReadOnly => {
1081                        TransactionAccessMode::ReadOnly
1082                    }
1083                    ast::TransactionAccessMode::ReadWrite => {
1084                        TransactionAccessMode::ReadWrite
1085                    }
1086                };
1087                let statement = PlanStatement::TransactionStart(TransactionStart {
1088                    access_mode,
1089                    isolation_level,
1090                });
1091                Ok(LogicalPlan::Statement(statement))
1092            }
1093            Statement::Commit {
1094                chain,
1095                end,
1096                modifier,
1097            } => {
1098                if end {
1099                    return not_impl_err!("COMMIT AND END not supported");
1100                };
1101                if let Some(modifier) = modifier {
1102                    return not_impl_err!("COMMIT {modifier} not supported");
1103                };
1104                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1105                    conclusion: TransactionConclusion::Commit,
1106                    chain,
1107                });
1108                Ok(LogicalPlan::Statement(statement))
1109            }
1110            Statement::Rollback { chain, savepoint } => {
1111                if savepoint.is_some() {
1112                    plan_err!("Savepoints not supported")?;
1113                }
1114                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1115                    conclusion: TransactionConclusion::Rollback,
1116                    chain,
1117                });
1118                Ok(LogicalPlan::Statement(statement))
1119            }
1120            Statement::CreateFunction(ast::CreateFunction {
1121                or_replace,
1122                temporary,
1123                name,
1124                args,
1125                return_type,
1126                function_body,
1127                behavior,
1128                language,
1129                ..
1130            }) => {
1131                let return_type = match return_type {
1132                    Some(t) => Some(self.convert_data_type(&t)?),
1133                    None => None,
1134                };
1135                let mut planner_context = PlannerContext::new();
1136                let empty_schema = &DFSchema::empty();
1137
1138                let args = match args {
1139                    Some(function_args) => {
1140                        let function_args = function_args
1141                            .into_iter()
1142                            .map(|arg| {
1143                                let data_type = self.convert_data_type(&arg.data_type)?;
1144
1145                                let default_expr = match arg.default_expr {
1146                                    Some(expr) => Some(self.sql_to_expr(
1147                                        expr,
1148                                        empty_schema,
1149                                        &mut planner_context,
1150                                    )?),
1151                                    None => None,
1152                                };
1153                                Ok(OperateFunctionArg {
1154                                    name: arg.name,
1155                                    default_expr,
1156                                    data_type,
1157                                })
1158                            })
1159                            .collect::<Result<Vec<OperateFunctionArg>>>();
1160                        Some(function_args?)
1161                    }
1162                    None => None,
1163                };
1164                // At the moment functions can't be qualified `schema.name`
1165                let name = match &name.0[..] {
1166                    [] => exec_err!("Function should have name")?,
1167                    [n] => n.as_ident().unwrap().value.clone(),
1168                    [..] => not_impl_err!("Qualified functions are not supported")?,
1169                };
1170                //
1171                // Convert resulting expression to data fusion expression
1172                //
1173                let arg_types = args.as_ref().map(|arg| {
1174                    arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
1175                });
1176                let mut planner_context = PlannerContext::new()
1177                    .with_prepare_param_data_types(arg_types.unwrap_or_default());
1178
1179                let function_body = match function_body {
1180                    Some(r) => Some(self.sql_to_expr(
1181                        match r {
1182                            ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1183                            ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1184                            ast::CreateFunctionBody::Return(expr) => expr,
1185                        },
1186                        &DFSchema::empty(),
1187                        &mut planner_context,
1188                    )?),
1189                    None => None,
1190                };
1191
1192                let params = CreateFunctionBody {
1193                    language,
1194                    behavior: behavior.map(|b| match b {
1195                        ast::FunctionBehavior::Immutable => Volatility::Immutable,
1196                        ast::FunctionBehavior::Stable => Volatility::Stable,
1197                        ast::FunctionBehavior::Volatile => Volatility::Volatile,
1198                    }),
1199                    function_body,
1200                };
1201
1202                let statement = DdlStatement::CreateFunction(CreateFunction {
1203                    or_replace,
1204                    temporary,
1205                    name,
1206                    return_type,
1207                    args,
1208                    params,
1209                    schema: DFSchemaRef::new(DFSchema::empty()),
1210                });
1211
1212                Ok(LogicalPlan::Ddl(statement))
1213            }
1214            Statement::DropFunction {
1215                if_exists,
1216                func_desc,
1217                ..
1218            } => {
1219                // According to postgresql documentation it can be only one function
1220                // specified in drop statement
1221                if let Some(desc) = func_desc.first() {
1222                    // At the moment functions can't be qualified `schema.name`
1223                    let name = match &desc.name.0[..] {
1224                        [] => exec_err!("Function should have name")?,
1225                        [n] => n.as_ident().unwrap().value.clone(),
1226                        [..] => not_impl_err!("Qualified functions are not supported")?,
1227                    };
1228                    let statement = DdlStatement::DropFunction(DropFunction {
1229                        if_exists,
1230                        name,
1231                        schema: DFSchemaRef::new(DFSchema::empty()),
1232                    });
1233                    Ok(LogicalPlan::Ddl(statement))
1234                } else {
1235                    exec_err!("Function name not provided")
1236                }
1237            }
1238            Statement::CreateIndex(CreateIndex {
1239                name,
1240                table_name,
1241                using,
1242                columns,
1243                unique,
1244                if_not_exists,
1245                ..
1246            }) => {
1247                let name: Option<String> = name.as_ref().map(object_name_to_string);
1248                let table = self.object_name_to_table_reference(table_name)?;
1249                let table_schema = self
1250                    .context_provider
1251                    .get_table_source(table.clone())?
1252                    .schema()
1253                    .to_dfschema_ref()?;
1254                let using: Option<String> = using.as_ref().map(ident_to_string);
1255                let columns = self.order_by_to_sort_expr(
1256                    columns,
1257                    &table_schema,
1258                    planner_context,
1259                    false,
1260                    None,
1261                )?;
1262                Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1263                    PlanCreateIndex {
1264                        name,
1265                        table,
1266                        using,
1267                        columns,
1268                        unique,
1269                        if_not_exists,
1270                        schema: DFSchemaRef::new(DFSchema::empty()),
1271                    },
1272                )))
1273            }
1274            stmt => {
1275                not_impl_err!("Unsupported SQL statement: {stmt}")
1276            }
1277        }
1278    }
1279
1280    fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1281        let mut from = match from {
1282            FromTable::WithFromKeyword(v) => v,
1283            FromTable::WithoutKeyword(v) => v,
1284        };
1285
1286        if from.len() != 1 {
1287            return not_impl_err!(
1288                "DELETE FROM only supports single table, got {}: {from:?}",
1289                from.len()
1290            );
1291        }
1292        let table_factor = from.pop().unwrap();
1293        if !table_factor.joins.is_empty() {
1294            return not_impl_err!("DELETE FROM only supports single table, got: joins");
1295        }
1296        let TableFactor::Table { name, .. } = table_factor.relation else {
1297            return not_impl_err!(
1298                "DELETE FROM only supports single table, got: {table_factor:?}"
1299            );
1300        };
1301
1302        Ok(name)
1303    }
1304
1305    /// Generate a logical plan from a "SHOW TABLES" query
1306    fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1307        if self.has_table("information_schema", "tables") {
1308            let query = "SELECT * FROM information_schema.tables;";
1309            let mut rewrite = DFParser::parse_sql(query)?;
1310            assert_eq!(rewrite.len(), 1);
1311            self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
1312        } else {
1313            plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1314        }
1315    }
1316
1317    fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1318        let table_ref = self.object_name_to_table_reference(table_name)?;
1319
1320        let table_source = self.context_provider.get_table_source(table_ref)?;
1321
1322        let schema = table_source.schema();
1323
1324        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1325
1326        Ok(LogicalPlan::DescribeTable(DescribeTable {
1327            schema,
1328            output_schema: Arc::new(output_schema),
1329        }))
1330    }
1331
1332    fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1333        // Determine if source is table or query and handle accordingly
1334        let copy_source = statement.source;
1335        let (input, input_schema, table_ref) = match copy_source {
1336            CopyToSource::Relation(object_name) => {
1337                let table_name = object_name_to_string(&object_name);
1338                let table_ref = self.object_name_to_table_reference(object_name)?;
1339                let table_source =
1340                    self.context_provider.get_table_source(table_ref.clone())?;
1341                let plan =
1342                    LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1343                let input_schema = Arc::clone(plan.schema());
1344                (plan, input_schema, Some(table_ref))
1345            }
1346            CopyToSource::Query(query) => {
1347                let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1348                let input_schema = Arc::clone(plan.schema());
1349                (plan, input_schema, None)
1350            }
1351        };
1352
1353        let options_map = self.parse_options_map(statement.options, true)?;
1354
1355        let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1356            self.context_provider.get_file_type(stored_as).ok()
1357        } else {
1358            None
1359        };
1360
1361        let file_type = match maybe_file_type {
1362            Some(ft) => ft,
1363            None => {
1364                let e = || {
1365                    DataFusionError::Configuration(
1366                        "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1367                            .to_string(),
1368                    )
1369                };
1370                // Try to infer file format from file extension
1371                let extension: &str = &Path::new(&statement.target)
1372                    .extension()
1373                    .ok_or_else(e)?
1374                    .to_str()
1375                    .ok_or_else(e)?
1376                    .to_lowercase();
1377
1378                self.context_provider.get_file_type(extension)?
1379            }
1380        };
1381
1382        let partition_by = statement
1383            .partitioned_by
1384            .iter()
1385            .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1386            .collect::<Result<Vec<_>>>()?
1387            .into_iter()
1388            .map(|f| f.name().to_owned())
1389            .collect();
1390
1391        Ok(LogicalPlan::Copy(CopyTo {
1392            input: Arc::new(input),
1393            output_url: statement.target,
1394            file_type,
1395            partition_by,
1396            options: options_map,
1397        }))
1398    }
1399
1400    fn build_order_by(
1401        &self,
1402        order_exprs: Vec<LexOrdering>,
1403        schema: &DFSchemaRef,
1404        planner_context: &mut PlannerContext,
1405    ) -> Result<Vec<Vec<SortExpr>>> {
1406        if !order_exprs.is_empty() && schema.fields().is_empty() {
1407            let results = order_exprs
1408                .iter()
1409                .map(|lex_order| {
1410                    let result = lex_order
1411                        .iter()
1412                        .map(|order_by_expr| {
1413                            let ordered_expr = &order_by_expr.expr;
1414                            let ordered_expr = ordered_expr.to_owned();
1415                            let ordered_expr = self
1416                                .sql_expr_to_logical_expr(
1417                                    ordered_expr,
1418                                    schema,
1419                                    planner_context,
1420                                )
1421                                .unwrap();
1422                            let asc = order_by_expr.options.asc.unwrap_or(true);
1423                            let nulls_first =
1424                                order_by_expr.options.nulls_first.unwrap_or(!asc);
1425
1426                            SortExpr::new(ordered_expr, asc, nulls_first)
1427                        })
1428                        .collect::<Vec<SortExpr>>();
1429                    result
1430                })
1431                .collect::<Vec<Vec<SortExpr>>>();
1432
1433            return Ok(results);
1434        }
1435
1436        let mut all_results = vec![];
1437        for expr in order_exprs {
1438            // Convert each OrderByExpr to a SortExpr:
1439            let expr_vec =
1440                self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1441            // Verify that columns of all SortExprs exist in the schema:
1442            for sort in expr_vec.iter() {
1443                for column in sort.expr.column_refs().iter() {
1444                    if !schema.has_column(column) {
1445                        // Return an error if any column is not in the schema:
1446                        return plan_err!("Column {column} is not in schema");
1447                    }
1448                }
1449            }
1450            // If all SortExprs are valid, return them as an expression vector
1451            all_results.push(expr_vec)
1452        }
1453        Ok(all_results)
1454    }
1455
1456    /// Generate a logical plan from a CREATE EXTERNAL TABLE statement
1457    fn external_table_to_plan(
1458        &self,
1459        statement: CreateExternalTable,
1460    ) -> Result<LogicalPlan> {
1461        let definition = Some(statement.to_string());
1462        let CreateExternalTable {
1463            name,
1464            columns,
1465            file_type,
1466            location,
1467            table_partition_cols,
1468            if_not_exists,
1469            temporary,
1470            order_exprs,
1471            unbounded,
1472            options,
1473            constraints,
1474        } = statement;
1475
1476        // Merge inline constraints and existing constraints
1477        let mut all_constraints = constraints;
1478        let inline_constraints = calc_inline_constraints_from_columns(&columns);
1479        all_constraints.extend(inline_constraints);
1480
1481        let options_map = self.parse_options_map(options, false)?;
1482
1483        let compression = options_map
1484            .get("format.compression")
1485            .map(|c| CompressionTypeVariant::from_str(c))
1486            .transpose()?;
1487        if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1488            && compression
1489                .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1490                .unwrap_or(false)
1491        {
1492            plan_err!(
1493                "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1494            )?;
1495        }
1496
1497        let mut planner_context = PlannerContext::new();
1498
1499        let column_defaults = self
1500            .build_column_defaults(&columns, &mut planner_context)?
1501            .into_iter()
1502            .collect();
1503
1504        let schema = self.build_schema(columns)?;
1505        let df_schema = schema.to_dfschema_ref()?;
1506        df_schema.check_names()?;
1507
1508        let ordered_exprs =
1509            self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1510
1511        let name = self.object_name_to_table_reference(name)?;
1512        let constraints =
1513            self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1514        Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1515            PlanCreateExternalTable {
1516                schema: df_schema,
1517                name,
1518                location,
1519                file_type,
1520                table_partition_cols,
1521                if_not_exists,
1522                temporary,
1523                definition,
1524                order_exprs: ordered_exprs,
1525                unbounded,
1526                options: options_map,
1527                constraints,
1528                column_defaults,
1529            },
1530        )))
1531    }
1532
1533    /// Get the indices of the constraint columns in the schema.
1534    /// If any column is not found, return an error.
1535    fn get_constraint_column_indices(
1536        &self,
1537        df_schema: &DFSchemaRef,
1538        columns: &[Ident],
1539        constraint_name: &str,
1540    ) -> Result<Vec<usize>> {
1541        let field_names = df_schema.field_names();
1542        columns
1543            .iter()
1544            .map(|ident| {
1545                let column = self.ident_normalizer.normalize(ident.clone());
1546                field_names
1547                    .iter()
1548                    .position(|item| *item == column)
1549                    .ok_or_else(|| {
1550                        plan_datafusion_err!(
1551                            "Column for {constraint_name} not found in schema: {column}"
1552                        )
1553                    })
1554            })
1555            .collect::<Result<Vec<_>>>()
1556    }
1557
1558    /// Convert each [TableConstraint] to corresponding [Constraint]
1559    pub fn new_constraint_from_table_constraints(
1560        &self,
1561        constraints: &[TableConstraint],
1562        df_schema: &DFSchemaRef,
1563    ) -> Result<Constraints> {
1564        let constraints = constraints
1565            .iter()
1566            .map(|c: &TableConstraint| match c {
1567                TableConstraint::Unique { name, columns, .. } => {
1568                    let constraint_name = match name {
1569                        Some(name) => &format!("unique constraint with name '{name}'"),
1570                        None => "unique constraint",
1571                    };
1572                    // Get unique constraint indices in the schema
1573                    let indices = self.get_constraint_column_indices(
1574                        df_schema,
1575                        columns,
1576                        constraint_name,
1577                    )?;
1578                    Ok(Constraint::Unique(indices))
1579                }
1580                TableConstraint::PrimaryKey { columns, .. } => {
1581                    // Get primary key indices in the schema
1582                    let indices = self.get_constraint_column_indices(
1583                        df_schema,
1584                        columns,
1585                        "primary key",
1586                    )?;
1587                    Ok(Constraint::PrimaryKey(indices))
1588                }
1589                TableConstraint::ForeignKey { .. } => {
1590                    _plan_err!("Foreign key constraints are not currently supported")
1591                }
1592                TableConstraint::Check { .. } => {
1593                    _plan_err!("Check constraints are not currently supported")
1594                }
1595                TableConstraint::Index { .. } => {
1596                    _plan_err!("Indexes are not currently supported")
1597                }
1598                TableConstraint::FulltextOrSpatial { .. } => {
1599                    _plan_err!("Indexes are not currently supported")
1600                }
1601            })
1602            .collect::<Result<Vec<_>>>()?;
1603        Ok(Constraints::new_unverified(constraints))
1604    }
1605
1606    fn parse_options_map(
1607        &self,
1608        options: Vec<(String, Value)>,
1609        allow_duplicates: bool,
1610    ) -> Result<HashMap<String, String>> {
1611        let mut options_map = HashMap::new();
1612        for (key, value) in options {
1613            if !allow_duplicates && options_map.contains_key(&key) {
1614                return plan_err!("Option {key} is specified multiple times");
1615            }
1616
1617            let Some(value_string) = crate::utils::value_to_string(&value) else {
1618                return plan_err!("Unsupported Value {}", value);
1619            };
1620
1621            if !(&key.contains('.')) {
1622                // If config does not belong to any namespace, assume it is
1623                // a format option and apply the format prefix for backwards
1624                // compatibility.
1625                let renamed_key = format!("format.{key}");
1626                options_map.insert(renamed_key.to_lowercase(), value_string);
1627            } else {
1628                options_map.insert(key.to_lowercase(), value_string);
1629            }
1630        }
1631
1632        Ok(options_map)
1633    }
1634
1635    /// Generate a plan for EXPLAIN ... that will print out a plan
1636    ///
1637    /// Note this is the sqlparser explain statement, not the
1638    /// datafusion `EXPLAIN` statement.
1639    fn explain_to_plan(
1640        &self,
1641        verbose: bool,
1642        analyze: bool,
1643        format: Option<String>,
1644        statement: DFStatement,
1645    ) -> Result<LogicalPlan> {
1646        let plan = self.statement_to_plan(statement)?;
1647        if matches!(plan, LogicalPlan::Explain(_)) {
1648            return plan_err!("Nested EXPLAINs are not supported");
1649        }
1650
1651        let plan = Arc::new(plan);
1652        let schema = LogicalPlan::explain_schema();
1653        let schema = schema.to_dfschema_ref()?;
1654
1655        if verbose && format.is_some() {
1656            return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1657        }
1658
1659        if analyze {
1660            if format.is_some() {
1661                return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1662            }
1663            Ok(LogicalPlan::Analyze(Analyze {
1664                verbose,
1665                input: plan,
1666                schema,
1667            }))
1668        } else {
1669            let stringified_plans =
1670                vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1671
1672            // default to configuration value
1673            let options = self.context_provider.options();
1674            let format = format.as_ref().unwrap_or(&options.explain.format);
1675
1676            let format: ExplainFormat = format.parse()?;
1677
1678            Ok(LogicalPlan::Explain(Explain {
1679                verbose,
1680                explain_format: format,
1681                plan,
1682                stringified_plans,
1683                schema,
1684                logical_optimization_succeeded: false,
1685            }))
1686        }
1687    }
1688
1689    fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1690        if !self.has_table("information_schema", "df_settings") {
1691            return plan_err!(
1692                "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1693            );
1694        }
1695
1696        let verbose = variable
1697            .last()
1698            .map(|s| ident_to_string(s) == "verbose")
1699            .unwrap_or(false);
1700        let mut variable_vec = variable.to_vec();
1701        let mut columns: String = "name, value".to_owned();
1702
1703        if verbose {
1704            columns = format!("{columns}, description");
1705            variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1706        }
1707
1708        let variable = object_name_to_string(&ObjectName::from(variable_vec));
1709        let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1710        let query = if variable == "all" {
1711            // Add an ORDER BY so the output comes out in a consistent order
1712            format!("{base_query} ORDER BY name")
1713        } else if variable == "timezone" || variable == "time.zone" {
1714            // we could introduce alias in OptionDefinition if this string matching thing grows
1715            format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1716        } else {
1717            // These values are what are used to make the information_schema table, so we just
1718            // check here, before actually planning or executing the query, if it would produce no
1719            // results, and error preemptively if it would (for a better UX)
1720            let is_valid_variable = self
1721                .context_provider
1722                .options()
1723                .entries()
1724                .iter()
1725                .any(|opt| opt.key == variable);
1726
1727            if !is_valid_variable {
1728                return plan_err!(
1729                    "'{variable}' is not a variable which can be viewed with 'SHOW'"
1730                );
1731            }
1732
1733            format!("{base_query} WHERE name = '{variable}'")
1734        };
1735
1736        let mut rewrite = DFParser::parse_sql(&query)?;
1737        assert_eq!(rewrite.len(), 1);
1738
1739        self.statement_to_plan(rewrite.pop_front().unwrap())
1740    }
1741
1742    fn set_variable_to_plan(
1743        &self,
1744        local: bool,
1745        hivevar: bool,
1746        variables: &OneOrManyWithParens<ObjectName>,
1747        value: Vec<SQLExpr>,
1748    ) -> Result<LogicalPlan> {
1749        if local {
1750            return not_impl_err!("LOCAL is not supported");
1751        }
1752
1753        if hivevar {
1754            return not_impl_err!("HIVEVAR is not supported");
1755        }
1756
1757        let variable = match variables {
1758            OneOrManyWithParens::One(v) => object_name_to_string(v),
1759            OneOrManyWithParens::Many(vs) => {
1760                return not_impl_err!(
1761                    "SET only supports single variable assignment: {vs:?}"
1762                );
1763            }
1764        };
1765        let mut variable_lower = variable.to_lowercase();
1766
1767        if variable_lower == "timezone" || variable_lower == "time.zone" {
1768            // We could introduce alias in OptionDefinition if this string matching thing grows
1769            variable_lower = "datafusion.execution.time_zone".to_string();
1770        }
1771
1772        // Parse value string from Expr
1773        let value_string = match &value[0] {
1774            SQLExpr::Identifier(i) => ident_to_string(i),
1775            SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
1776                None => {
1777                    return plan_err!("Unsupported Value {}", value[0]);
1778                }
1779                Some(v) => v,
1780            },
1781            // For capture signed number e.g. +8, -8
1782            SQLExpr::UnaryOp { op, expr } => match op {
1783                UnaryOperator::Plus => format!("+{expr}"),
1784                UnaryOperator::Minus => format!("-{expr}"),
1785                _ => {
1786                    return plan_err!("Unsupported Value {}", value[0]);
1787                }
1788            },
1789            _ => {
1790                return plan_err!("Unsupported Value {}", value[0]);
1791            }
1792        };
1793
1794        let statement = PlanStatement::SetVariable(SetVariable {
1795            variable: variable_lower,
1796            value: value_string,
1797        });
1798
1799        Ok(LogicalPlan::Statement(statement))
1800    }
1801
1802    fn delete_to_plan(
1803        &self,
1804        table_name: ObjectName,
1805        predicate_expr: Option<SQLExpr>,
1806    ) -> Result<LogicalPlan> {
1807        // Do a table lookup to verify the table exists
1808        let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1809        let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1810        let schema = DFSchema::try_from_qualified_schema(
1811            table_ref.clone(),
1812            &table_source.schema(),
1813        )?;
1814        let scan =
1815            LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1816                .build()?;
1817        let mut planner_context = PlannerContext::new();
1818
1819        let source = match predicate_expr {
1820            None => scan,
1821            Some(predicate_expr) => {
1822                let filter_expr =
1823                    self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1824                let schema = Arc::new(schema);
1825                let mut using_columns = HashSet::new();
1826                expr_to_columns(&filter_expr, &mut using_columns)?;
1827                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1828                    filter_expr,
1829                    &[&[&schema]],
1830                    &[using_columns],
1831                )?;
1832                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1833            }
1834        };
1835
1836        let plan = LogicalPlan::Dml(DmlStatement::new(
1837            table_ref,
1838            table_source,
1839            WriteOp::Delete,
1840            Arc::new(source),
1841        ));
1842        Ok(plan)
1843    }
1844
1845    fn update_to_plan(
1846        &self,
1847        table: TableWithJoins,
1848        assignments: Vec<Assignment>,
1849        from: Option<TableWithJoins>,
1850        predicate_expr: Option<SQLExpr>,
1851    ) -> Result<LogicalPlan> {
1852        let (table_name, table_alias) = match &table.relation {
1853            TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
1854            _ => plan_err!("Cannot update non-table relation!")?,
1855        };
1856
1857        // Do a table lookup to verify the table exists
1858        let table_name = self.object_name_to_table_reference(table_name)?;
1859        let table_source = self.context_provider.get_table_source(table_name.clone())?;
1860        let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
1861            table_name.clone(),
1862            &table_source.schema(),
1863        )?);
1864
1865        // Overwrite with assignment expressions
1866        let mut planner_context = PlannerContext::new();
1867        let mut assign_map = assignments
1868            .iter()
1869            .map(|assign| {
1870                let cols = match &assign.target {
1871                    AssignmentTarget::ColumnName(cols) => cols,
1872                    _ => plan_err!("Tuples are not supported")?,
1873                };
1874                let col_name: &Ident = cols
1875                    .0
1876                    .iter()
1877                    .last()
1878                    .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
1879                    .as_ident()
1880                    .unwrap();
1881                // Validate that the assignment target column exists
1882                table_schema.field_with_unqualified_name(&col_name.value)?;
1883                Ok((col_name.value.clone(), assign.value.clone()))
1884            })
1885            .collect::<Result<HashMap<String, SQLExpr>>>()?;
1886
1887        // Build scan, join with from table if it exists.
1888        let mut input_tables = vec![table];
1889        input_tables.extend(from);
1890        let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
1891
1892        // Filter
1893        let source = match predicate_expr {
1894            None => scan,
1895            Some(predicate_expr) => {
1896                let filter_expr = self.sql_to_expr(
1897                    predicate_expr,
1898                    scan.schema(),
1899                    &mut planner_context,
1900                )?;
1901                let mut using_columns = HashSet::new();
1902                expr_to_columns(&filter_expr, &mut using_columns)?;
1903                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1904                    filter_expr,
1905                    &[&[scan.schema()]],
1906                    &[using_columns],
1907                )?;
1908                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1909            }
1910        };
1911
1912        // Build updated values for each column, using the previous value if not modified
1913        let exprs = table_schema
1914            .iter()
1915            .map(|(qualifier, field)| {
1916                let expr = match assign_map.remove(field.name()) {
1917                    Some(new_value) => {
1918                        let mut expr = self.sql_to_expr(
1919                            new_value,
1920                            source.schema(),
1921                            &mut planner_context,
1922                        )?;
1923                        // Update placeholder's datatype to the type of the target column
1924                        if let Expr::Placeholder(placeholder) = &mut expr {
1925                            placeholder.data_type = placeholder
1926                                .data_type
1927                                .take()
1928                                .or_else(|| Some(field.data_type().clone()));
1929                        }
1930                        // Cast to target column type, if necessary
1931                        expr.cast_to(field.data_type(), source.schema())?
1932                    }
1933                    None => {
1934                        // If the target table has an alias, use it to qualify the column name
1935                        if let Some(alias) = &table_alias {
1936                            Expr::Column(Column::new(
1937                                Some(self.ident_normalizer.normalize(alias.name.clone())),
1938                                field.name(),
1939                            ))
1940                        } else {
1941                            Expr::Column(Column::from((qualifier, field)))
1942                        }
1943                    }
1944                };
1945                Ok(expr.alias(field.name()))
1946            })
1947            .collect::<Result<Vec<_>>>()?;
1948
1949        let source = project(source, exprs)?;
1950
1951        let plan = LogicalPlan::Dml(DmlStatement::new(
1952            table_name,
1953            table_source,
1954            WriteOp::Update,
1955            Arc::new(source),
1956        ));
1957        Ok(plan)
1958    }
1959
1960    fn insert_to_plan(
1961        &self,
1962        table_name: ObjectName,
1963        columns: Vec<Ident>,
1964        source: Box<Query>,
1965        overwrite: bool,
1966        replace_into: bool,
1967    ) -> Result<LogicalPlan> {
1968        // Do a table lookup to verify the table exists
1969        let table_name = self.object_name_to_table_reference(table_name)?;
1970        let table_source = self.context_provider.get_table_source(table_name.clone())?;
1971        let arrow_schema = (*table_source.schema()).clone();
1972        let table_schema = DFSchema::try_from(arrow_schema)?;
1973
1974        // Get insert fields and target table's value indices
1975        //
1976        // If value_indices[i] = Some(j), it means that the value of the i-th target table's column is
1977        // derived from the j-th output of the source.
1978        //
1979        // If value_indices[i] = None, it means that the value of the i-th target table's column is
1980        // not provided, and should be filled with a default value later.
1981        let (fields, value_indices) = if columns.is_empty() {
1982            // Empty means we're inserting into all columns of the table
1983            (
1984                table_schema.fields().clone(),
1985                (0..table_schema.fields().len())
1986                    .map(Some)
1987                    .collect::<Vec<_>>(),
1988            )
1989        } else {
1990            let mut value_indices = vec![None; table_schema.fields().len()];
1991            let fields = columns
1992                .into_iter()
1993                .map(|c| self.ident_normalizer.normalize(c))
1994                .enumerate()
1995                .map(|(i, c)| {
1996                    let column_index = table_schema
1997                        .index_of_column_by_name(None, &c)
1998                        .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
1999
2000                    if value_indices[column_index].is_some() {
2001                        return schema_err!(SchemaError::DuplicateUnqualifiedField {
2002                            name: c,
2003                        });
2004                    } else {
2005                        value_indices[column_index] = Some(i);
2006                    }
2007                    Ok(table_schema.field(column_index).clone())
2008                })
2009                .collect::<Result<Vec<_>>>()?;
2010            (Fields::from(fields), value_indices)
2011        };
2012
2013        // infer types for Values clause... other types should be resolvable the regular way
2014        let mut prepare_param_data_types = BTreeMap::new();
2015        if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2016            for row in rows.iter() {
2017                for (idx, val) in row.iter().enumerate() {
2018                    if let SQLExpr::Value(ValueWithSpan {
2019                        value: Value::Placeholder(name),
2020                        span: _,
2021                    }) = val
2022                    {
2023                        let name =
2024                            name.replace('$', "").parse::<usize>().map_err(|_| {
2025                                plan_datafusion_err!("Can't parse placeholder: {name}")
2026                            })? - 1;
2027                        let field = fields.get(idx).ok_or_else(|| {
2028                            plan_datafusion_err!(
2029                                "Placeholder ${} refers to a non existent column",
2030                                idx + 1
2031                            )
2032                        })?;
2033                        let dt = field.data_type().clone();
2034                        let _ = prepare_param_data_types.insert(name, dt);
2035                    }
2036                }
2037            }
2038        }
2039        let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2040
2041        // Projection
2042        let mut planner_context =
2043            PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2044        planner_context.set_table_schema(Some(DFSchemaRef::new(
2045            DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2046        )));
2047        let source = self.query_to_plan(*source, &mut planner_context)?;
2048        if fields.len() != source.schema().fields().len() {
2049            plan_err!("Column count doesn't match insert query!")?;
2050        }
2051
2052        let exprs = value_indices
2053            .into_iter()
2054            .enumerate()
2055            .map(|(i, value_index)| {
2056                let target_field = table_schema.field(i);
2057                let expr = match value_index {
2058                    Some(v) => {
2059                        Expr::Column(Column::from(source.schema().qualified_field(v)))
2060                            .cast_to(target_field.data_type(), source.schema())?
2061                    }
2062                    // The value is not specified. Fill in the default value for the column.
2063                    None => table_source
2064                        .get_column_default(target_field.name())
2065                        .cloned()
2066                        .unwrap_or_else(|| {
2067                            // If there is no default for the column, then the default is NULL
2068                            Expr::Literal(ScalarValue::Null, None)
2069                        })
2070                        .cast_to(target_field.data_type(), &DFSchema::empty())?,
2071                };
2072                Ok(expr.alias(target_field.name()))
2073            })
2074            .collect::<Result<Vec<Expr>>>()?;
2075        let source = project(source, exprs)?;
2076
2077        let insert_op = match (overwrite, replace_into) {
2078            (false, false) => InsertOp::Append,
2079            (true, false) => InsertOp::Overwrite,
2080            (false, true) => InsertOp::Replace,
2081            (true, true) => plan_err!("Conflicting insert operations: `overwrite` and `replace_into` cannot both be true")?,
2082        };
2083
2084        let plan = LogicalPlan::Dml(DmlStatement::new(
2085            table_name,
2086            Arc::clone(&table_source),
2087            WriteOp::Insert(insert_op),
2088            Arc::new(source),
2089        ));
2090        Ok(plan)
2091    }
2092
2093    fn show_columns_to_plan(
2094        &self,
2095        extended: bool,
2096        full: bool,
2097        sql_table_name: ObjectName,
2098    ) -> Result<LogicalPlan> {
2099        // Figure out the where clause
2100        let where_clause = object_name_to_qualifier(
2101            &sql_table_name,
2102            self.options.enable_ident_normalization,
2103        )?;
2104
2105        if !self.has_table("information_schema", "columns") {
2106            return plan_err!(
2107                "SHOW COLUMNS is not supported unless information_schema is enabled"
2108            );
2109        }
2110
2111        // Do a table lookup to verify the table exists
2112        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2113        let _ = self.context_provider.get_table_source(table_ref)?;
2114
2115        // Treat both FULL and EXTENDED as the same
2116        let select_list = if full || extended {
2117            "*"
2118        } else {
2119            "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2120        };
2121
2122        let query = format!(
2123            "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2124        );
2125
2126        let mut rewrite = DFParser::parse_sql(&query)?;
2127        assert_eq!(rewrite.len(), 1);
2128        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2129    }
2130
2131    /// Rewrite `SHOW FUNCTIONS` to another SQL query
2132    /// The query is based on the `information_schema.routines` and `information_schema.parameters` tables
2133    ///
2134    /// The output columns:
2135    /// - function_name: The name of function
2136    /// - return_type: The return type of the function
2137    /// - parameters: The name of parameters (ordered by the ordinal position)
2138    /// - parameter_types: The type of parameters (ordered by the ordinal position)
2139    /// - description: The description of the function (the description defined in the document)
2140    /// - syntax_example: The syntax_example of the function (the syntax_example defined in the document)
2141    fn show_functions_to_plan(
2142        &self,
2143        filter: Option<ShowStatementFilter>,
2144    ) -> Result<LogicalPlan> {
2145        let where_clause = if let Some(filter) = filter {
2146            match filter {
2147                ShowStatementFilter::Like(like) => {
2148                    format!("WHERE p.function_name like '{like}'")
2149                }
2150                _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2151            }
2152        } else {
2153            "".to_string()
2154        };
2155
2156        let query = format!(
2157            r#"
2158SELECT DISTINCT
2159    p.*,
2160    r.function_type function_type,
2161    r.description description,
2162    r.syntax_example syntax_example
2163FROM
2164    (
2165        SELECT
2166            i.specific_name function_name,
2167            o.data_type return_type,
2168            array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2169            array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2170        FROM (
2171                 SELECT
2172                     specific_catalog,
2173                     specific_schema,
2174                     specific_name,
2175                     ordinal_position,
2176                     parameter_name,
2177                     data_type,
2178                     rid
2179                 FROM
2180                     information_schema.parameters
2181                 WHERE
2182                     parameter_mode = 'IN'
2183             ) i
2184                 JOIN
2185             (
2186                 SELECT
2187                     specific_catalog,
2188                     specific_schema,
2189                     specific_name,
2190                     ordinal_position,
2191                     parameter_name,
2192                     data_type,
2193                     rid
2194                 FROM
2195                     information_schema.parameters
2196                 WHERE
2197                     parameter_mode = 'OUT'
2198             ) o
2199             ON i.specific_catalog = o.specific_catalog
2200                 AND i.specific_schema = o.specific_schema
2201                 AND i.specific_name = o.specific_name
2202                 AND i.rid = o.rid
2203        GROUP BY 1, 2, i.rid
2204    ) as p
2205JOIN information_schema.routines r
2206ON p.function_name = r.routine_name
2207{where_clause}
2208            "#
2209        );
2210        let mut rewrite = DFParser::parse_sql(&query)?;
2211        assert_eq!(rewrite.len(), 1);
2212        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2213    }
2214
2215    fn show_create_table_to_plan(
2216        &self,
2217        sql_table_name: ObjectName,
2218    ) -> Result<LogicalPlan> {
2219        if !self.has_table("information_schema", "tables") {
2220            return plan_err!(
2221                "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2222            );
2223        }
2224        // Figure out the where clause
2225        let where_clause = object_name_to_qualifier(
2226            &sql_table_name,
2227            self.options.enable_ident_normalization,
2228        )?;
2229
2230        // Do a table lookup to verify the table exists
2231        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2232        let _ = self.context_provider.get_table_source(table_ref)?;
2233
2234        let query = format!(
2235            "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2236        );
2237
2238        let mut rewrite = DFParser::parse_sql(&query)?;
2239        assert_eq!(rewrite.len(), 1);
2240        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2241    }
2242
2243    /// Return true if there is a table provider available for "schema.table"
2244    fn has_table(&self, schema: &str, table: &str) -> bool {
2245        let tables_reference = TableReference::Partial {
2246            schema: schema.into(),
2247            table: table.into(),
2248        };
2249        self.context_provider
2250            .get_table_source(tables_reference)
2251            .is_ok()
2252    }
2253
2254    fn validate_transaction_kind(
2255        &self,
2256        kind: Option<BeginTransactionKind>,
2257    ) -> Result<()> {
2258        match kind {
2259            // BEGIN
2260            None => Ok(()),
2261            // BEGIN TRANSACTION
2262            Some(BeginTransactionKind::Transaction) => Ok(()),
2263            Some(BeginTransactionKind::Work) => {
2264                not_impl_err!("Transaction kind not supported: {kind:?}")
2265            }
2266        }
2267    }
2268}