datafusion_sql/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24    CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25    LexOrdering, Statement as DFStatement,
26};
27use crate::planner::{
28    object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{DataType, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36    exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
37    unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
38    DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
39    ToDFSchema,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::builder::project;
44use datafusion_expr::logical_plan::DdlStatement;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47    cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
48    CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49    CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50    DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51    EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52    LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable,
53    SortExpr, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
54    TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
55    Volatility, WriteOp,
56};
57use sqlparser::ast::{
58    self, BeginTransactionKind, IndexColumn, IndexType, NullsDistinctOption, OrderByExpr,
59    OrderByOptions, Set, ShowStatementIn, ShowStatementOptions, SqliteOnConflict,
60    TableObject, UpdateTableFromKind, ValueWithSpan,
61};
62use sqlparser::ast::{
63    Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
64    CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
65    ObjectName, ObjectType, Query, SchemaName, SetExpr, ShowCreateObject,
66    ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
67    TransactionMode, UnaryOperator, Value,
68};
69use sqlparser::parser::ParserError::ParserError;
70
71fn ident_to_string(ident: &Ident) -> String {
72    normalize_ident(ident.to_owned())
73}
74
75fn object_name_to_string(object_name: &ObjectName) -> String {
76    object_name
77        .0
78        .iter()
79        .map(|object_name_part| {
80            object_name_part
81                .as_ident()
82                // TODO: It might be better to return an error
83                // than to silently use a default value.
84                .map_or_else(String::new, ident_to_string)
85        })
86        .collect::<Vec<String>>()
87        .join(".")
88}
89
90fn get_schema_name(schema_name: &SchemaName) -> String {
91    match schema_name {
92        SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
93        SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
94        SchemaName::NamedAuthorization(schema_name, auth) => format!(
95            "{}.{}",
96            object_name_to_string(schema_name),
97            ident_to_string(auth)
98        ),
99    }
100}
101
102/// Construct `TableConstraint`(s) for the given columns by iterating over
103/// `columns` and extracting individual inline constraint definitions.
104fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
105    let mut constraints = vec![];
106    for column in columns {
107        for ast::ColumnOptionDef { name, option } in &column.options {
108            match option {
109                ast::ColumnOption::Unique {
110                    is_primary: false,
111                    characteristics,
112                } => constraints.push(TableConstraint::Unique {
113                    name: name.clone(),
114                    columns: vec![IndexColumn {
115                        column: OrderByExpr {
116                            expr: SQLExpr::Identifier(column.name.clone()),
117                            options: OrderByOptions {
118                                asc: None,
119                                nulls_first: None,
120                            },
121                            with_fill: None,
122                        },
123                        operator_class: None,
124                    }],
125                    characteristics: *characteristics,
126                    index_name: None,
127                    index_type_display: ast::KeyOrIndexDisplay::None,
128                    index_type: None,
129                    index_options: vec![],
130                    nulls_distinct: NullsDistinctOption::None,
131                }),
132                ast::ColumnOption::Unique {
133                    is_primary: true,
134                    characteristics,
135                } => constraints.push(TableConstraint::PrimaryKey {
136                    name: name.clone(),
137                    columns: vec![IndexColumn {
138                        column: OrderByExpr {
139                            expr: SQLExpr::Identifier(column.name.clone()),
140                            options: OrderByOptions {
141                                asc: None,
142                                nulls_first: None,
143                            },
144                            with_fill: None,
145                        },
146                        operator_class: None,
147                    }],
148                    characteristics: *characteristics,
149                    index_name: None,
150                    index_type: None,
151                    index_options: vec![],
152                }),
153                ast::ColumnOption::ForeignKey {
154                    foreign_table,
155                    referred_columns,
156                    on_delete,
157                    on_update,
158                    characteristics,
159                } => constraints.push(TableConstraint::ForeignKey {
160                    name: name.clone(),
161                    columns: vec![],
162                    foreign_table: foreign_table.clone(),
163                    referred_columns: referred_columns.to_vec(),
164                    on_delete: *on_delete,
165                    on_update: *on_update,
166                    characteristics: *characteristics,
167                    index_name: None,
168                }),
169                ast::ColumnOption::Check(expr) => {
170                    constraints.push(TableConstraint::Check {
171                        name: name.clone(),
172                        expr: Box::new(expr.clone()),
173                        enforced: None,
174                    })
175                }
176                // Other options are not constraint related.
177                ast::ColumnOption::Default(_)
178                | ast::ColumnOption::Null
179                | ast::ColumnOption::NotNull
180                | ast::ColumnOption::DialectSpecific(_)
181                | ast::ColumnOption::CharacterSet(_)
182                | ast::ColumnOption::Generated { .. }
183                | ast::ColumnOption::Comment(_)
184                | ast::ColumnOption::Options(_)
185                | ast::ColumnOption::OnUpdate(_)
186                | ast::ColumnOption::Materialized(_)
187                | ast::ColumnOption::Ephemeral(_)
188                | ast::ColumnOption::Identity(_)
189                | ast::ColumnOption::OnConflict(_)
190                | ast::ColumnOption::Policy(_)
191                | ast::ColumnOption::Tags(_)
192                | ast::ColumnOption::Alias(_)
193                | ast::ColumnOption::Srid(_)
194                | ast::ColumnOption::Collation(_) => {}
195            }
196        }
197    }
198    constraints
199}
200
201impl<S: ContextProvider> SqlToRel<'_, S> {
202    /// Generate a logical plan from an DataFusion SQL statement
203    pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
204        match statement {
205            DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
206            DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
207            DFStatement::CopyTo(s) => self.copy_to_plan(s),
208            DFStatement::Explain(ExplainStatement {
209                verbose,
210                analyze,
211                format,
212                statement,
213            }) => self.explain_to_plan(verbose, analyze, format, *statement),
214        }
215    }
216
217    /// Generate a logical plan from an SQL statement
218    pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
219        self.sql_statement_to_plan_with_context_impl(
220            statement,
221            &mut PlannerContext::new(),
222        )
223    }
224
225    /// Generate a logical plan from an SQL statement
226    pub fn sql_statement_to_plan_with_context(
227        &self,
228        statement: Statement,
229        planner_context: &mut PlannerContext,
230    ) -> Result<LogicalPlan> {
231        self.sql_statement_to_plan_with_context_impl(statement, planner_context)
232    }
233
234    fn sql_statement_to_plan_with_context_impl(
235        &self,
236        statement: Statement,
237        planner_context: &mut PlannerContext,
238    ) -> Result<LogicalPlan> {
239        match statement {
240            Statement::ExplainTable {
241                describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, // only parse 'DESCRIBE table_name' or 'DESC table_name' and not 'EXPLAIN table_name'
242                table_name,
243                ..
244            } => self.describe_table_to_plan(table_name),
245            Statement::Explain {
246                verbose,
247                statement,
248                analyze,
249                format,
250                describe_alias: _,
251                ..
252            } => {
253                let format = format.map(|format| format.to_string());
254                let statement = DFStatement::Statement(statement);
255                self.explain_to_plan(verbose, analyze, format, statement)
256            }
257            Statement::Query(query) => self.query_to_plan(*query, planner_context),
258            Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
259            Statement::Set(statement) => self.set_statement_to_plan(statement),
260            Statement::CreateTable(CreateTable {
261                temporary,
262                external,
263                global,
264                transient,
265                volatile,
266                hive_distribution,
267                hive_formats,
268                file_format,
269                location,
270                query,
271                name,
272                columns,
273                constraints,
274                if_not_exists,
275                or_replace,
276                without_rowid,
277                like,
278                clone,
279                comment,
280                on_commit,
281                on_cluster,
282                primary_key,
283                order_by,
284                partition_by,
285                cluster_by,
286                clustered_by,
287                strict,
288                copy_grants,
289                enable_schema_evolution,
290                change_tracking,
291                data_retention_time_in_days,
292                max_data_extension_time_in_days,
293                default_ddl_collation,
294                with_aggregation_policy,
295                with_row_access_policy,
296                with_tags,
297                iceberg,
298                external_volume,
299                base_location,
300                catalog,
301                catalog_sync,
302                storage_serialization_policy,
303                inherits,
304                table_options: CreateTableOptions::None,
305            }) => {
306                if temporary {
307                    return not_impl_err!("Temporary tables not supported")?;
308                }
309                if external {
310                    return not_impl_err!("External tables not supported")?;
311                }
312                if global.is_some() {
313                    return not_impl_err!("Global tables not supported")?;
314                }
315                if transient {
316                    return not_impl_err!("Transient tables not supported")?;
317                }
318                if volatile {
319                    return not_impl_err!("Volatile tables not supported")?;
320                }
321                if hive_distribution != ast::HiveDistributionStyle::NONE {
322                    return not_impl_err!(
323                        "Hive distribution not supported: {hive_distribution:?}"
324                    )?;
325                }
326                if !matches!(
327                    hive_formats,
328                    Some(ast::HiveFormat {
329                        row_format: None,
330                        serde_properties: None,
331                        storage: None,
332                        location: None,
333                    })
334                ) {
335                    return not_impl_err!(
336                        "Hive formats not supported: {hive_formats:?}"
337                    )?;
338                }
339                if file_format.is_some() {
340                    return not_impl_err!("File format not supported")?;
341                }
342                if location.is_some() {
343                    return not_impl_err!("Location not supported")?;
344                }
345                if without_rowid {
346                    return not_impl_err!("Without rowid not supported")?;
347                }
348                if like.is_some() {
349                    return not_impl_err!("Like not supported")?;
350                }
351                if clone.is_some() {
352                    return not_impl_err!("Clone not supported")?;
353                }
354                if comment.is_some() {
355                    return not_impl_err!("Comment not supported")?;
356                }
357                if on_commit.is_some() {
358                    return not_impl_err!("On commit not supported")?;
359                }
360                if on_cluster.is_some() {
361                    return not_impl_err!("On cluster not supported")?;
362                }
363                if primary_key.is_some() {
364                    return not_impl_err!("Primary key not supported")?;
365                }
366                if order_by.is_some() {
367                    return not_impl_err!("Order by not supported")?;
368                }
369                if partition_by.is_some() {
370                    return not_impl_err!("Partition by not supported")?;
371                }
372                if cluster_by.is_some() {
373                    return not_impl_err!("Cluster by not supported")?;
374                }
375                if clustered_by.is_some() {
376                    return not_impl_err!("Clustered by not supported")?;
377                }
378                if strict {
379                    return not_impl_err!("Strict not supported")?;
380                }
381                if copy_grants {
382                    return not_impl_err!("Copy grants not supported")?;
383                }
384                if enable_schema_evolution.is_some() {
385                    return not_impl_err!("Enable schema evolution not supported")?;
386                }
387                if change_tracking.is_some() {
388                    return not_impl_err!("Change tracking not supported")?;
389                }
390                if data_retention_time_in_days.is_some() {
391                    return not_impl_err!("Data retention time in days not supported")?;
392                }
393                if max_data_extension_time_in_days.is_some() {
394                    return not_impl_err!(
395                        "Max data extension time in days not supported"
396                    )?;
397                }
398                if default_ddl_collation.is_some() {
399                    return not_impl_err!("Default DDL collation not supported")?;
400                }
401                if with_aggregation_policy.is_some() {
402                    return not_impl_err!("With aggregation policy not supported")?;
403                }
404                if with_row_access_policy.is_some() {
405                    return not_impl_err!("With row access policy not supported")?;
406                }
407                if with_tags.is_some() {
408                    return not_impl_err!("With tags not supported")?;
409                }
410                if iceberg {
411                    return not_impl_err!("Iceberg not supported")?;
412                }
413                if external_volume.is_some() {
414                    return not_impl_err!("External volume not supported")?;
415                }
416                if base_location.is_some() {
417                    return not_impl_err!("Base location not supported")?;
418                }
419                if catalog.is_some() {
420                    return not_impl_err!("Catalog not supported")?;
421                }
422                if catalog_sync.is_some() {
423                    return not_impl_err!("Catalog sync not supported")?;
424                }
425                if storage_serialization_policy.is_some() {
426                    return not_impl_err!("Storage serialization policy not supported")?;
427                }
428                if inherits.is_some() {
429                    return not_impl_err!("Table inheritance not supported")?;
430                }
431
432                // Merge inline constraints and existing constraints
433                let mut all_constraints = constraints;
434                let inline_constraints = calc_inline_constraints_from_columns(&columns);
435                all_constraints.extend(inline_constraints);
436                // Build column default values
437                let column_defaults =
438                    self.build_column_defaults(&columns, planner_context)?;
439
440                let has_columns = !columns.is_empty();
441                let schema = self.build_schema(columns)?.to_dfschema_ref()?;
442                if has_columns {
443                    planner_context.set_table_schema(Some(Arc::clone(&schema)));
444                }
445
446                match query {
447                    Some(query) => {
448                        let plan = self.query_to_plan(*query, planner_context)?;
449                        let input_schema = plan.schema();
450
451                        let plan = if has_columns {
452                            if schema.fields().len() != input_schema.fields().len() {
453                                return plan_err!(
454                                    "Mismatch: {} columns specified, but result has {} columns",
455                                    schema.fields().len(),
456                                    input_schema.fields().len()
457                                );
458                            }
459                            let input_fields = input_schema.fields();
460                            let project_exprs = schema
461                                .fields()
462                                .iter()
463                                .zip(input_fields)
464                                .map(|(field, input_field)| {
465                                    cast(
466                                        col(input_field.name()),
467                                        field.data_type().clone(),
468                                    )
469                                    .alias(field.name())
470                                })
471                                .collect::<Vec<_>>();
472
473                            LogicalPlanBuilder::from(plan.clone())
474                                .project(project_exprs)?
475                                .build()?
476                        } else {
477                            plan
478                        };
479
480                        let constraints = self.new_constraint_from_table_constraints(
481                            &all_constraints,
482                            plan.schema(),
483                        )?;
484
485                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
486                            CreateMemoryTable {
487                                name: self.object_name_to_table_reference(name)?,
488                                constraints,
489                                input: Arc::new(plan),
490                                if_not_exists,
491                                or_replace,
492                                column_defaults,
493                                temporary,
494                            },
495                        )))
496                    }
497
498                    None => {
499                        let plan = EmptyRelation {
500                            produce_one_row: false,
501                            schema,
502                        };
503                        let plan = LogicalPlan::EmptyRelation(plan);
504                        let constraints = self.new_constraint_from_table_constraints(
505                            &all_constraints,
506                            plan.schema(),
507                        )?;
508                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
509                            CreateMemoryTable {
510                                name: self.object_name_to_table_reference(name)?,
511                                constraints,
512                                input: Arc::new(plan),
513                                if_not_exists,
514                                or_replace,
515                                column_defaults,
516                                temporary,
517                            },
518                        )))
519                    }
520                }
521            }
522
523            Statement::CreateView {
524                or_replace,
525                materialized,
526                name,
527                columns,
528                query,
529                options: CreateTableOptions::None,
530                cluster_by,
531                comment,
532                with_no_schema_binding,
533                if_not_exists,
534                temporary,
535                to,
536                params,
537                or_alter,
538            } => {
539                if materialized {
540                    return not_impl_err!("Materialized views not supported")?;
541                }
542                if !cluster_by.is_empty() {
543                    return not_impl_err!("Cluster by not supported")?;
544                }
545                if comment.is_some() {
546                    return not_impl_err!("Comment not supported")?;
547                }
548                if with_no_schema_binding {
549                    return not_impl_err!("With no schema binding not supported")?;
550                }
551                if if_not_exists {
552                    return not_impl_err!("If not exists not supported")?;
553                }
554                if to.is_some() {
555                    return not_impl_err!("To not supported")?;
556                }
557
558                // put the statement back together temporarily to get the SQL
559                // string representation
560                let stmt = Statement::CreateView {
561                    or_replace,
562                    materialized,
563                    name,
564                    columns,
565                    query,
566                    options: CreateTableOptions::None,
567                    cluster_by,
568                    comment,
569                    with_no_schema_binding,
570                    if_not_exists,
571                    temporary,
572                    to,
573                    params,
574                    or_alter,
575                };
576                let sql = stmt.to_string();
577                let Statement::CreateView {
578                    name,
579                    columns,
580                    query,
581                    or_replace,
582                    temporary,
583                    ..
584                } = stmt
585                else {
586                    return internal_err!("Unreachable code in create view");
587                };
588
589                let columns = columns
590                    .into_iter()
591                    .map(|view_column_def| {
592                        if let Some(options) = view_column_def.options {
593                            plan_err!(
594                                "Options not supported for view columns: {options:?}"
595                            )
596                        } else {
597                            Ok(view_column_def.name)
598                        }
599                    })
600                    .collect::<Result<Vec<_>>>()?;
601
602                let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
603                plan = self.apply_expr_alias(plan, columns)?;
604
605                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
606                    name: self.object_name_to_table_reference(name)?,
607                    input: Arc::new(plan),
608                    or_replace,
609                    definition: Some(sql),
610                    temporary,
611                })))
612            }
613            Statement::ShowCreate { obj_type, obj_name } => match obj_type {
614                ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
615                _ => {
616                    not_impl_err!("Only `SHOW CREATE TABLE  ...` statement is supported")
617                }
618            },
619            Statement::CreateSchema {
620                schema_name,
621                if_not_exists,
622                ..
623            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
624                CreateCatalogSchema {
625                    schema_name: get_schema_name(&schema_name),
626                    if_not_exists,
627                    schema: Arc::new(DFSchema::empty()),
628                },
629            ))),
630            Statement::CreateDatabase {
631                db_name,
632                if_not_exists,
633                ..
634            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
635                CreateCatalog {
636                    catalog_name: object_name_to_string(&db_name),
637                    if_not_exists,
638                    schema: Arc::new(DFSchema::empty()),
639                },
640            ))),
641            Statement::Drop {
642                object_type,
643                if_exists,
644                mut names,
645                cascade,
646                restrict: _,
647                purge: _,
648                temporary: _,
649                table: _,
650            } => {
651                // We don't support cascade and purge for now.
652                // nor do we support multiple object names
653                let name = match names.len() {
654                    0 => Err(ParserError("Missing table name.".to_string()).into()),
655                    1 => self.object_name_to_table_reference(names.pop().unwrap()),
656                    _ => {
657                        Err(ParserError("Multiple objects not supported".to_string())
658                            .into())
659                    }
660                }?;
661
662                match object_type {
663                    ObjectType::Table => {
664                        Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
665                            name,
666                            if_exists,
667                            schema: DFSchemaRef::new(DFSchema::empty()),
668                        })))
669                    }
670                    ObjectType::View => {
671                        Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
672                            name,
673                            if_exists,
674                            schema: DFSchemaRef::new(DFSchema::empty()),
675                        })))
676                    }
677                    ObjectType::Schema => {
678                        let name = match name {
679                            TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
680                            TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
681                            TableReference::Full { catalog: _, schema: _, table: _ } => {
682                                Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
683                            }
684                        }?;
685                        Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
686                            name,
687                            if_exists,
688                            cascade,
689                            schema: DFSchemaRef::new(DFSchema::empty()),
690                        })))
691                    }
692                    _ => not_impl_err!(
693                        "Only `DROP TABLE/VIEW/SCHEMA  ...` statement is supported currently"
694                    ),
695                }
696            }
697            Statement::Prepare {
698                name,
699                data_types,
700                statement,
701            } => {
702                // Convert parser data types to DataFusion data types
703                let mut data_types: Vec<DataType> = data_types
704                    .into_iter()
705                    .map(|t| self.convert_data_type(&t))
706                    .collect::<Result<_>>()?;
707
708                // Create planner context with parameters
709                let mut planner_context = PlannerContext::new()
710                    .with_prepare_param_data_types(data_types.clone());
711
712                // Build logical plan for inner statement of the prepare statement
713                let plan = self.sql_statement_to_plan_with_context_impl(
714                    *statement,
715                    &mut planner_context,
716                )?;
717
718                if data_types.is_empty() {
719                    let map_types = plan.get_parameter_types()?;
720                    let param_types: Vec<_> = (1..=map_types.len())
721                        .filter_map(|i| {
722                            let key = format!("${i}");
723                            map_types.get(&key).and_then(|opt| opt.clone())
724                        })
725                        .collect();
726                    data_types.extend(param_types.iter().cloned());
727                    planner_context.with_prepare_param_data_types(param_types);
728                }
729
730                Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
731                    name: ident_to_string(&name),
732                    data_types,
733                    input: Arc::new(plan),
734                })))
735            }
736            Statement::Execute {
737                name,
738                parameters,
739                using,
740                // has_parentheses specifies the syntax, but the plan is the
741                // same no matter the syntax used, so ignore it
742                has_parentheses: _,
743                immediate,
744                into,
745                output,
746                default,
747            } => {
748                // `USING` is a MySQL-specific syntax and currently not supported.
749                if !using.is_empty() {
750                    return not_impl_err!(
751                        "Execute statement with USING is not supported"
752                    );
753                }
754                if immediate {
755                    return not_impl_err!(
756                        "Execute statement with IMMEDIATE is not supported"
757                    );
758                }
759                if !into.is_empty() {
760                    return not_impl_err!("Execute statement with INTO is not supported");
761                }
762                if output {
763                    return not_impl_err!(
764                        "Execute statement with OUTPUT is not supported"
765                    );
766                }
767                if default {
768                    return not_impl_err!(
769                        "Execute statement with DEFAULT is not supported"
770                    );
771                }
772                let empty_schema = DFSchema::empty();
773                let parameters = parameters
774                    .into_iter()
775                    .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
776                    .collect::<Result<Vec<Expr>>>()?;
777
778                Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
779                    name: object_name_to_string(&name.unwrap()),
780                    parameters,
781                })))
782            }
783            Statement::Deallocate {
784                name,
785                // Similar to PostgreSQL, the PREPARE keyword is ignored
786                prepare: _,
787            } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
788                Deallocate {
789                    name: ident_to_string(&name),
790                },
791            ))),
792
793            Statement::ShowTables {
794                extended,
795                full,
796                terse,
797                history,
798                external,
799                show_options,
800            } => {
801                // We only support the basic "SHOW TABLES"
802                // https://github.com/apache/datafusion/issues/3188
803                if extended {
804                    return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
805                }
806                if full {
807                    return not_impl_err!("SHOW FULL TABLES not supported")?;
808                }
809                if terse {
810                    return not_impl_err!("SHOW TERSE TABLES not supported")?;
811                }
812                if history {
813                    return not_impl_err!("SHOW TABLES HISTORY not supported")?;
814                }
815                if external {
816                    return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
817                }
818                let ShowStatementOptions {
819                    show_in,
820                    starts_with,
821                    limit,
822                    limit_from,
823                    filter_position,
824                } = show_options;
825                if show_in.is_some() {
826                    return not_impl_err!("SHOW TABLES IN not supported")?;
827                }
828                if starts_with.is_some() {
829                    return not_impl_err!("SHOW TABLES LIKE not supported")?;
830                }
831                if limit.is_some() {
832                    return not_impl_err!("SHOW TABLES LIMIT not supported")?;
833                }
834                if limit_from.is_some() {
835                    return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
836                }
837                if filter_position.is_some() {
838                    return not_impl_err!("SHOW TABLES FILTER not supported")?;
839                }
840                self.show_tables_to_plan()
841            }
842
843            Statement::ShowColumns {
844                extended,
845                full,
846                show_options,
847            } => {
848                let ShowStatementOptions {
849                    show_in,
850                    starts_with,
851                    limit,
852                    limit_from,
853                    filter_position,
854                } = show_options;
855                if starts_with.is_some() {
856                    return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
857                }
858                if limit.is_some() {
859                    return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
860                }
861                if limit_from.is_some() {
862                    return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
863                }
864                if filter_position.is_some() {
865                    return not_impl_err!(
866                        "SHOW COLUMNS with WHERE or LIKE is not supported"
867                    )?;
868                }
869                let Some(ShowStatementIn {
870                    // specifies if the syntax was `SHOW COLUMNS IN` or `SHOW
871                    // COLUMNS FROM` which is not different in DataFusion
872                    clause: _,
873                    parent_type,
874                    parent_name,
875                }) = show_in
876                else {
877                    return plan_err!("SHOW COLUMNS requires a table name");
878                };
879
880                if let Some(parent_type) = parent_type {
881                    return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
882                }
883                let Some(table_name) = parent_name else {
884                    return plan_err!("SHOW COLUMNS requires a table name");
885                };
886
887                self.show_columns_to_plan(extended, full, table_name)
888            }
889
890            Statement::ShowFunctions { filter, .. } => {
891                self.show_functions_to_plan(filter)
892            }
893
894            Statement::Insert(Insert {
895                or,
896                into,
897                columns,
898                overwrite,
899                source,
900                partitioned,
901                after_columns,
902                table,
903                on,
904                returning,
905                ignore,
906                table_alias,
907                mut replace_into,
908                priority,
909                insert_alias,
910                assignments,
911                has_table_keyword,
912                settings,
913                format_clause,
914            }) => {
915                let table_name = match table {
916                    TableObject::TableName(table_name) => table_name,
917                    TableObject::TableFunction(_) => {
918                        return not_impl_err!("INSERT INTO Table functions not supported")
919                    }
920                };
921                if let Some(or) = or {
922                    match or {
923                        SqliteOnConflict::Replace => replace_into = true,
924                        _ => plan_err!("Inserts with {or} clause is not supported")?,
925                    }
926                }
927                if partitioned.is_some() {
928                    plan_err!("Partitioned inserts not yet supported")?;
929                }
930                if !after_columns.is_empty() {
931                    plan_err!("After-columns clause not supported")?;
932                }
933                if on.is_some() {
934                    plan_err!("Insert-on clause not supported")?;
935                }
936                if returning.is_some() {
937                    plan_err!("Insert-returning clause not supported")?;
938                }
939                if ignore {
940                    plan_err!("Insert-ignore clause not supported")?;
941                }
942                let Some(source) = source else {
943                    plan_err!("Inserts without a source not supported")?
944                };
945                if let Some(table_alias) = table_alias {
946                    plan_err!(
947                        "Inserts with a table alias not supported: {table_alias:?}"
948                    )?
949                };
950                if let Some(priority) = priority {
951                    plan_err!(
952                        "Inserts with a `PRIORITY` clause not supported: {priority:?}"
953                    )?
954                };
955                if insert_alias.is_some() {
956                    plan_err!("Inserts with an alias not supported")?;
957                }
958                if !assignments.is_empty() {
959                    plan_err!("Inserts with assignments not supported")?;
960                }
961                if settings.is_some() {
962                    plan_err!("Inserts with settings not supported")?;
963                }
964                if format_clause.is_some() {
965                    plan_err!("Inserts with format clause not supported")?;
966                }
967                // optional keywords don't change behavior
968                let _ = into;
969                let _ = has_table_keyword;
970                self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
971            }
972            Statement::Update {
973                table,
974                assignments,
975                from,
976                selection,
977                returning,
978                or,
979            } => {
980                let from_clauses =
981                    from.map(|update_table_from_kind| match update_table_from_kind {
982                        UpdateTableFromKind::BeforeSet(from_clauses) => from_clauses,
983                        UpdateTableFromKind::AfterSet(from_clauses) => from_clauses,
984                    });
985                // TODO: support multiple tables in UPDATE SET FROM
986                if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
987                    plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
988                }
989                let update_from = from_clauses.and_then(|mut f| f.pop());
990                if returning.is_some() {
991                    plan_err!("Update-returning clause not yet supported")?;
992                }
993                if or.is_some() {
994                    plan_err!("ON conflict not supported")?;
995                }
996                self.update_to_plan(table, assignments, update_from, selection)
997            }
998
999            Statement::Delete(Delete {
1000                tables,
1001                using,
1002                selection,
1003                returning,
1004                from,
1005                order_by,
1006                limit,
1007            }) => {
1008                if !tables.is_empty() {
1009                    plan_err!("DELETE <TABLE> not supported")?;
1010                }
1011
1012                if using.is_some() {
1013                    plan_err!("Using clause not supported")?;
1014                }
1015
1016                if returning.is_some() {
1017                    plan_err!("Delete-returning clause not yet supported")?;
1018                }
1019
1020                if !order_by.is_empty() {
1021                    plan_err!("Delete-order-by clause not yet supported")?;
1022                }
1023
1024                if limit.is_some() {
1025                    plan_err!("Delete-limit clause not yet supported")?;
1026                }
1027
1028                let table_name = self.get_delete_target(from)?;
1029                self.delete_to_plan(table_name, selection)
1030            }
1031
1032            Statement::StartTransaction {
1033                modes,
1034                begin: false,
1035                modifier,
1036                transaction,
1037                statements,
1038                has_end_keyword,
1039                exception,
1040            } => {
1041                if let Some(modifier) = modifier {
1042                    return not_impl_err!(
1043                        "Transaction modifier not supported: {modifier}"
1044                    );
1045                }
1046                if !statements.is_empty() {
1047                    return not_impl_err!(
1048                        "Transaction with multiple statements not supported"
1049                    );
1050                }
1051                if exception.is_some() {
1052                    return not_impl_err!(
1053                        "Transaction with exception statements not supported"
1054                    );
1055                }
1056                if has_end_keyword {
1057                    return not_impl_err!("Transaction with END keyword not supported");
1058                }
1059                self.validate_transaction_kind(transaction)?;
1060                let isolation_level: ast::TransactionIsolationLevel = modes
1061                    .iter()
1062                    .filter_map(|m: &TransactionMode| match m {
1063                        TransactionMode::AccessMode(_) => None,
1064                        TransactionMode::IsolationLevel(level) => Some(level),
1065                    })
1066                    .next_back()
1067                    .copied()
1068                    .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1069                let access_mode: ast::TransactionAccessMode = modes
1070                    .iter()
1071                    .filter_map(|m: &TransactionMode| match m {
1072                        TransactionMode::AccessMode(mode) => Some(mode),
1073                        TransactionMode::IsolationLevel(_) => None,
1074                    })
1075                    .next_back()
1076                    .copied()
1077                    .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1078                let isolation_level = match isolation_level {
1079                    ast::TransactionIsolationLevel::ReadUncommitted => {
1080                        TransactionIsolationLevel::ReadUncommitted
1081                    }
1082                    ast::TransactionIsolationLevel::ReadCommitted => {
1083                        TransactionIsolationLevel::ReadCommitted
1084                    }
1085                    ast::TransactionIsolationLevel::RepeatableRead => {
1086                        TransactionIsolationLevel::RepeatableRead
1087                    }
1088                    ast::TransactionIsolationLevel::Serializable => {
1089                        TransactionIsolationLevel::Serializable
1090                    }
1091                    ast::TransactionIsolationLevel::Snapshot => {
1092                        TransactionIsolationLevel::Snapshot
1093                    }
1094                };
1095                let access_mode = match access_mode {
1096                    ast::TransactionAccessMode::ReadOnly => {
1097                        TransactionAccessMode::ReadOnly
1098                    }
1099                    ast::TransactionAccessMode::ReadWrite => {
1100                        TransactionAccessMode::ReadWrite
1101                    }
1102                };
1103                let statement = PlanStatement::TransactionStart(TransactionStart {
1104                    access_mode,
1105                    isolation_level,
1106                });
1107                Ok(LogicalPlan::Statement(statement))
1108            }
1109            Statement::Commit {
1110                chain,
1111                end,
1112                modifier,
1113            } => {
1114                if end {
1115                    return not_impl_err!("COMMIT AND END not supported");
1116                };
1117                if let Some(modifier) = modifier {
1118                    return not_impl_err!("COMMIT {modifier} not supported");
1119                };
1120                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1121                    conclusion: TransactionConclusion::Commit,
1122                    chain,
1123                });
1124                Ok(LogicalPlan::Statement(statement))
1125            }
1126            Statement::Rollback { chain, savepoint } => {
1127                if savepoint.is_some() {
1128                    plan_err!("Savepoints not supported")?;
1129                }
1130                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1131                    conclusion: TransactionConclusion::Rollback,
1132                    chain,
1133                });
1134                Ok(LogicalPlan::Statement(statement))
1135            }
1136            Statement::CreateFunction(ast::CreateFunction {
1137                or_replace,
1138                temporary,
1139                name,
1140                args,
1141                return_type,
1142                function_body,
1143                behavior,
1144                language,
1145                ..
1146            }) => {
1147                let return_type = match return_type {
1148                    Some(t) => Some(self.convert_data_type(&t)?),
1149                    None => None,
1150                };
1151                let mut planner_context = PlannerContext::new();
1152                let empty_schema = &DFSchema::empty();
1153
1154                let args = match args {
1155                    Some(function_args) => {
1156                        let function_args = function_args
1157                            .into_iter()
1158                            .map(|arg| {
1159                                let data_type = self.convert_data_type(&arg.data_type)?;
1160
1161                                let default_expr = match arg.default_expr {
1162                                    Some(expr) => Some(self.sql_to_expr(
1163                                        expr,
1164                                        empty_schema,
1165                                        &mut planner_context,
1166                                    )?),
1167                                    None => None,
1168                                };
1169                                Ok(OperateFunctionArg {
1170                                    name: arg.name,
1171                                    default_expr,
1172                                    data_type,
1173                                })
1174                            })
1175                            .collect::<Result<Vec<OperateFunctionArg>>>();
1176                        Some(function_args?)
1177                    }
1178                    None => None,
1179                };
1180                // At the moment functions can't be qualified `schema.name`
1181                let name = match &name.0[..] {
1182                    [] => exec_err!("Function should have name")?,
1183                    [n] => n.as_ident().unwrap().value.clone(),
1184                    [..] => not_impl_err!("Qualified functions are not supported")?,
1185                };
1186                //
1187                // Convert resulting expression to data fusion expression
1188                //
1189                let arg_types = args.as_ref().map(|arg| {
1190                    arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
1191                });
1192                let mut planner_context = PlannerContext::new()
1193                    .with_prepare_param_data_types(arg_types.unwrap_or_default());
1194
1195                let function_body = match function_body {
1196                    Some(r) => Some(self.sql_to_expr(
1197                        match r {
1198                            ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1199                            ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1200                            ast::CreateFunctionBody::Return(expr) => expr,
1201                            ast::CreateFunctionBody::AsBeginEnd(_) => {
1202                                return not_impl_err!(
1203                                    "BEGIN/END enclosed function body syntax is not supported"
1204                                )?;
1205                            }
1206                            ast::CreateFunctionBody::AsReturnExpr(_)
1207                            | ast::CreateFunctionBody::AsReturnSelect(_) => {
1208                                return not_impl_err!(
1209                                    "AS RETURN function syntax is not supported"
1210                                )?
1211                            }
1212                        },
1213                        &DFSchema::empty(),
1214                        &mut planner_context,
1215                    )?),
1216                    None => None,
1217                };
1218
1219                let params = CreateFunctionBody {
1220                    language,
1221                    behavior: behavior.map(|b| match b {
1222                        ast::FunctionBehavior::Immutable => Volatility::Immutable,
1223                        ast::FunctionBehavior::Stable => Volatility::Stable,
1224                        ast::FunctionBehavior::Volatile => Volatility::Volatile,
1225                    }),
1226                    function_body,
1227                };
1228
1229                let statement = DdlStatement::CreateFunction(CreateFunction {
1230                    or_replace,
1231                    temporary,
1232                    name,
1233                    return_type,
1234                    args,
1235                    params,
1236                    schema: DFSchemaRef::new(DFSchema::empty()),
1237                });
1238
1239                Ok(LogicalPlan::Ddl(statement))
1240            }
1241            Statement::DropFunction {
1242                if_exists,
1243                func_desc,
1244                ..
1245            } => {
1246                // According to postgresql documentation it can be only one function
1247                // specified in drop statement
1248                if let Some(desc) = func_desc.first() {
1249                    // At the moment functions can't be qualified `schema.name`
1250                    let name = match &desc.name.0[..] {
1251                        [] => exec_err!("Function should have name")?,
1252                        [n] => n.as_ident().unwrap().value.clone(),
1253                        [..] => not_impl_err!("Qualified functions are not supported")?,
1254                    };
1255                    let statement = DdlStatement::DropFunction(DropFunction {
1256                        if_exists,
1257                        name,
1258                        schema: DFSchemaRef::new(DFSchema::empty()),
1259                    });
1260                    Ok(LogicalPlan::Ddl(statement))
1261                } else {
1262                    exec_err!("Function name not provided")
1263                }
1264            }
1265            Statement::CreateIndex(CreateIndex {
1266                name,
1267                table_name,
1268                using,
1269                columns,
1270                unique,
1271                if_not_exists,
1272                ..
1273            }) => {
1274                let name: Option<String> = name.as_ref().map(object_name_to_string);
1275                let table = self.object_name_to_table_reference(table_name)?;
1276                let table_schema = self
1277                    .context_provider
1278                    .get_table_source(table.clone())?
1279                    .schema()
1280                    .to_dfschema_ref()?;
1281                let using: Option<String> =
1282                    using.as_ref().map(|index_type| match index_type {
1283                        IndexType::Custom(ident) => ident_to_string(ident),
1284                        _ => index_type.to_string().to_ascii_lowercase(),
1285                    });
1286                let order_by_exprs: Vec<OrderByExpr> =
1287                    columns.into_iter().map(|col| col.column).collect();
1288                let columns = self.order_by_to_sort_expr(
1289                    order_by_exprs,
1290                    &table_schema,
1291                    planner_context,
1292                    false,
1293                    None,
1294                )?;
1295                Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1296                    PlanCreateIndex {
1297                        name,
1298                        table,
1299                        using,
1300                        columns,
1301                        unique,
1302                        if_not_exists,
1303                        schema: DFSchemaRef::new(DFSchema::empty()),
1304                    },
1305                )))
1306            }
1307            stmt => {
1308                not_impl_err!("Unsupported SQL statement: {stmt}")
1309            }
1310        }
1311    }
1312
1313    fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1314        let mut from = match from {
1315            FromTable::WithFromKeyword(v) => v,
1316            FromTable::WithoutKeyword(v) => v,
1317        };
1318
1319        if from.len() != 1 {
1320            return not_impl_err!(
1321                "DELETE FROM only supports single table, got {}: {from:?}",
1322                from.len()
1323            );
1324        }
1325        let table_factor = from.pop().unwrap();
1326        if !table_factor.joins.is_empty() {
1327            return not_impl_err!("DELETE FROM only supports single table, got: joins");
1328        }
1329        let TableFactor::Table { name, .. } = table_factor.relation else {
1330            return not_impl_err!(
1331                "DELETE FROM only supports single table, got: {table_factor:?}"
1332            );
1333        };
1334
1335        Ok(name)
1336    }
1337
1338    /// Generate a logical plan from a "SHOW TABLES" query
1339    fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1340        if self.has_table("information_schema", "tables") {
1341            let query = "SELECT * FROM information_schema.tables;";
1342            let mut rewrite = DFParser::parse_sql(query)?;
1343            assert_eq!(rewrite.len(), 1);
1344            self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
1345        } else {
1346            plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1347        }
1348    }
1349
1350    fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1351        let table_ref = self.object_name_to_table_reference(table_name)?;
1352
1353        let table_source = self.context_provider.get_table_source(table_ref)?;
1354
1355        let schema = table_source.schema();
1356
1357        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1358
1359        Ok(LogicalPlan::DescribeTable(DescribeTable {
1360            schema,
1361            output_schema: Arc::new(output_schema),
1362        }))
1363    }
1364
1365    fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1366        // Determine if source is table or query and handle accordingly
1367        let copy_source = statement.source;
1368        let (input, input_schema, table_ref) = match copy_source {
1369            CopyToSource::Relation(object_name) => {
1370                let table_name = object_name_to_string(&object_name);
1371                let table_ref = self.object_name_to_table_reference(object_name)?;
1372                let table_source =
1373                    self.context_provider.get_table_source(table_ref.clone())?;
1374                let plan =
1375                    LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1376                let input_schema = Arc::clone(plan.schema());
1377                (plan, input_schema, Some(table_ref))
1378            }
1379            CopyToSource::Query(query) => {
1380                let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1381                let input_schema = Arc::clone(plan.schema());
1382                (plan, input_schema, None)
1383            }
1384        };
1385
1386        let options_map = self.parse_options_map(statement.options, true)?;
1387
1388        let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1389            self.context_provider.get_file_type(stored_as).ok()
1390        } else {
1391            None
1392        };
1393
1394        let file_type = match maybe_file_type {
1395            Some(ft) => ft,
1396            None => {
1397                let e = || {
1398                    DataFusionError::Configuration(
1399                        "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1400                            .to_string(),
1401                    )
1402                };
1403                // Try to infer file format from file extension
1404                let extension: &str = &Path::new(&statement.target)
1405                    .extension()
1406                    .ok_or_else(e)?
1407                    .to_str()
1408                    .ok_or_else(e)?
1409                    .to_lowercase();
1410
1411                self.context_provider.get_file_type(extension)?
1412            }
1413        };
1414
1415        let partition_by = statement
1416            .partitioned_by
1417            .iter()
1418            .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1419            .collect::<Result<Vec<_>>>()?
1420            .into_iter()
1421            .map(|f| f.name().to_owned())
1422            .collect();
1423
1424        Ok(LogicalPlan::Copy(CopyTo::new(
1425            Arc::new(input),
1426            statement.target,
1427            partition_by,
1428            file_type,
1429            options_map,
1430        )))
1431    }
1432
1433    fn build_order_by(
1434        &self,
1435        order_exprs: Vec<LexOrdering>,
1436        schema: &DFSchemaRef,
1437        planner_context: &mut PlannerContext,
1438    ) -> Result<Vec<Vec<SortExpr>>> {
1439        if !order_exprs.is_empty() && schema.fields().is_empty() {
1440            let results = order_exprs
1441                .iter()
1442                .map(|lex_order| {
1443                    let result = lex_order
1444                        .iter()
1445                        .map(|order_by_expr| {
1446                            let ordered_expr = &order_by_expr.expr;
1447                            let ordered_expr = ordered_expr.to_owned();
1448                            let ordered_expr = self.sql_expr_to_logical_expr(
1449                                ordered_expr,
1450                                schema,
1451                                planner_context,
1452                            )?;
1453                            let asc = order_by_expr.options.asc.unwrap_or(true);
1454                            let nulls_first =
1455                                order_by_expr.options.nulls_first.unwrap_or_else(|| {
1456                                    self.options.default_null_ordering.nulls_first(asc)
1457                                });
1458
1459                            Ok(SortExpr::new(ordered_expr, asc, nulls_first))
1460                        })
1461                        .collect::<Result<Vec<SortExpr>>>()?;
1462                    Ok(result)
1463                })
1464                .collect::<Result<Vec<Vec<SortExpr>>>>()?;
1465
1466            return Ok(results);
1467        }
1468
1469        let mut all_results = vec![];
1470        for expr in order_exprs {
1471            // Convert each OrderByExpr to a SortExpr:
1472            let expr_vec =
1473                self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1474            // Verify that columns of all SortExprs exist in the schema:
1475            for sort in expr_vec.iter() {
1476                for column in sort.expr.column_refs().iter() {
1477                    if !schema.has_column(column) {
1478                        // Return an error if any column is not in the schema:
1479                        return plan_err!("Column {column} is not in schema");
1480                    }
1481                }
1482            }
1483            // If all SortExprs are valid, return them as an expression vector
1484            all_results.push(expr_vec)
1485        }
1486        Ok(all_results)
1487    }
1488
1489    /// Generate a logical plan from a CREATE EXTERNAL TABLE statement
1490    fn external_table_to_plan(
1491        &self,
1492        statement: CreateExternalTable,
1493    ) -> Result<LogicalPlan> {
1494        let definition = Some(statement.to_string());
1495        let CreateExternalTable {
1496            name,
1497            columns,
1498            file_type,
1499            location,
1500            table_partition_cols,
1501            if_not_exists,
1502            temporary,
1503            order_exprs,
1504            unbounded,
1505            options,
1506            constraints,
1507        } = statement;
1508
1509        // Merge inline constraints and existing constraints
1510        let mut all_constraints = constraints;
1511        let inline_constraints = calc_inline_constraints_from_columns(&columns);
1512        all_constraints.extend(inline_constraints);
1513
1514        let options_map = self.parse_options_map(options, false)?;
1515
1516        let compression = options_map
1517            .get("format.compression")
1518            .map(|c| CompressionTypeVariant::from_str(c))
1519            .transpose()?;
1520        if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1521            && compression
1522                .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1523                .unwrap_or(false)
1524        {
1525            plan_err!(
1526                "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1527            )?;
1528        }
1529
1530        let mut planner_context = PlannerContext::new();
1531
1532        let column_defaults = self
1533            .build_column_defaults(&columns, &mut planner_context)?
1534            .into_iter()
1535            .collect();
1536
1537        let schema = self.build_schema(columns)?;
1538        let df_schema = schema.to_dfschema_ref()?;
1539        df_schema.check_names()?;
1540
1541        let ordered_exprs =
1542            self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1543
1544        let name = self.object_name_to_table_reference(name)?;
1545        let constraints =
1546            self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1547        Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1548            PlanCreateExternalTable {
1549                schema: df_schema,
1550                name,
1551                location,
1552                file_type,
1553                table_partition_cols,
1554                if_not_exists,
1555                temporary,
1556                definition,
1557                order_exprs: ordered_exprs,
1558                unbounded,
1559                options: options_map,
1560                constraints,
1561                column_defaults,
1562            },
1563        )))
1564    }
1565
1566    /// Get the indices of the constraint columns in the schema.
1567    /// If any column is not found, return an error.
1568    fn get_constraint_column_indices(
1569        &self,
1570        df_schema: &DFSchemaRef,
1571        columns: &[IndexColumn],
1572        constraint_name: &str,
1573    ) -> Result<Vec<usize>> {
1574        let field_names = df_schema.field_names();
1575        columns
1576            .iter()
1577            .map(|index_column| {
1578                let expr = &index_column.column.expr;
1579                let ident = if let SQLExpr::Identifier(ident) = expr {
1580                    ident
1581                } else {
1582                    return Err(plan_datafusion_err!(
1583                        "Column name for {constraint_name} must be an identifier: {expr}"
1584                    ));
1585                };
1586                let column = self.ident_normalizer.normalize(ident.clone());
1587                field_names
1588                    .iter()
1589                    .position(|item| *item == column)
1590                    .ok_or_else(|| {
1591                        plan_datafusion_err!(
1592                            "Column for {constraint_name} not found in schema: {column}"
1593                        )
1594                    })
1595            })
1596            .collect::<Result<Vec<_>>>()
1597    }
1598
1599    /// Convert each [TableConstraint] to corresponding [Constraint]
1600    pub fn new_constraint_from_table_constraints(
1601        &self,
1602        constraints: &[TableConstraint],
1603        df_schema: &DFSchemaRef,
1604    ) -> Result<Constraints> {
1605        let constraints = constraints
1606            .iter()
1607            .map(|c: &TableConstraint| match c {
1608                TableConstraint::Unique { name, columns, .. } => {
1609                    let constraint_name = match name {
1610                        Some(name) => &format!("unique constraint with name '{name}'"),
1611                        None => "unique constraint",
1612                    };
1613                    // Get unique constraint indices in the schema
1614                    let indices = self.get_constraint_column_indices(
1615                        df_schema,
1616                        columns,
1617                        constraint_name,
1618                    )?;
1619                    Ok(Constraint::Unique(indices))
1620                }
1621                TableConstraint::PrimaryKey { columns, .. } => {
1622                    // Get primary key indices in the schema
1623                    let indices = self.get_constraint_column_indices(
1624                        df_schema,
1625                        columns,
1626                        "primary key",
1627                    )?;
1628                    Ok(Constraint::PrimaryKey(indices))
1629                }
1630                TableConstraint::ForeignKey { .. } => {
1631                    _plan_err!("Foreign key constraints are not currently supported")
1632                }
1633                TableConstraint::Check { .. } => {
1634                    _plan_err!("Check constraints are not currently supported")
1635                }
1636                TableConstraint::Index { .. } => {
1637                    _plan_err!("Indexes are not currently supported")
1638                }
1639                TableConstraint::FulltextOrSpatial { .. } => {
1640                    _plan_err!("Indexes are not currently supported")
1641                }
1642            })
1643            .collect::<Result<Vec<_>>>()?;
1644        Ok(Constraints::new_unverified(constraints))
1645    }
1646
1647    fn parse_options_map(
1648        &self,
1649        options: Vec<(String, Value)>,
1650        allow_duplicates: bool,
1651    ) -> Result<HashMap<String, String>> {
1652        let mut options_map = HashMap::new();
1653        for (key, value) in options {
1654            if !allow_duplicates && options_map.contains_key(&key) {
1655                return plan_err!("Option {key} is specified multiple times");
1656            }
1657
1658            let Some(value_string) = crate::utils::value_to_string(&value) else {
1659                return plan_err!("Unsupported Value {}", value);
1660            };
1661
1662            if !(&key.contains('.')) {
1663                // If config does not belong to any namespace, assume it is
1664                // a format option and apply the format prefix for backwards
1665                // compatibility.
1666                let renamed_key = format!("format.{key}");
1667                options_map.insert(renamed_key.to_lowercase(), value_string);
1668            } else {
1669                options_map.insert(key.to_lowercase(), value_string);
1670            }
1671        }
1672
1673        Ok(options_map)
1674    }
1675
1676    /// Generate a plan for EXPLAIN ... that will print out a plan
1677    ///
1678    /// Note this is the sqlparser explain statement, not the
1679    /// datafusion `EXPLAIN` statement.
1680    fn explain_to_plan(
1681        &self,
1682        verbose: bool,
1683        analyze: bool,
1684        format: Option<String>,
1685        statement: DFStatement,
1686    ) -> Result<LogicalPlan> {
1687        let plan = self.statement_to_plan(statement)?;
1688        if matches!(plan, LogicalPlan::Explain(_)) {
1689            return plan_err!("Nested EXPLAINs are not supported");
1690        }
1691
1692        let plan = Arc::new(plan);
1693        let schema = LogicalPlan::explain_schema();
1694        let schema = schema.to_dfschema_ref()?;
1695
1696        if verbose && format.is_some() {
1697            return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1698        }
1699
1700        if analyze {
1701            if format.is_some() {
1702                return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1703            }
1704            Ok(LogicalPlan::Analyze(Analyze {
1705                verbose,
1706                input: plan,
1707                schema,
1708            }))
1709        } else {
1710            let stringified_plans =
1711                vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1712
1713            // default to configuration value
1714            let options = self.context_provider.options();
1715            let format = format.as_ref().unwrap_or(&options.explain.format);
1716
1717            let format: ExplainFormat = format.parse()?;
1718
1719            Ok(LogicalPlan::Explain(Explain {
1720                verbose,
1721                explain_format: format,
1722                plan,
1723                stringified_plans,
1724                schema,
1725                logical_optimization_succeeded: false,
1726            }))
1727        }
1728    }
1729
1730    fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1731        if !self.has_table("information_schema", "df_settings") {
1732            return plan_err!(
1733                "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1734            );
1735        }
1736
1737        let verbose = variable
1738            .last()
1739            .map(|s| ident_to_string(s) == "verbose")
1740            .unwrap_or(false);
1741        let mut variable_vec = variable.to_vec();
1742        let mut columns: String = "name, value".to_owned();
1743
1744        if verbose {
1745            columns = format!("{columns}, description");
1746            variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1747        }
1748
1749        let variable = object_name_to_string(&ObjectName::from(variable_vec));
1750        let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1751        let query = if variable == "all" {
1752            // Add an ORDER BY so the output comes out in a consistent order
1753            format!("{base_query} ORDER BY name")
1754        } else if variable == "timezone" || variable == "time.zone" {
1755            // we could introduce alias in OptionDefinition if this string matching thing grows
1756            format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1757        } else {
1758            // These values are what are used to make the information_schema table, so we just
1759            // check here, before actually planning or executing the query, if it would produce no
1760            // results, and error preemptively if it would (for a better UX)
1761            let is_valid_variable = self
1762                .context_provider
1763                .options()
1764                .entries()
1765                .iter()
1766                .any(|opt| opt.key == variable);
1767
1768            if !is_valid_variable {
1769                return plan_err!(
1770                    "'{variable}' is not a variable which can be viewed with 'SHOW'"
1771                );
1772            }
1773
1774            format!("{base_query} WHERE name = '{variable}'")
1775        };
1776
1777        let mut rewrite = DFParser::parse_sql(&query)?;
1778        assert_eq!(rewrite.len(), 1);
1779
1780        self.statement_to_plan(rewrite.pop_front().unwrap())
1781    }
1782
1783    fn set_statement_to_plan(&self, statement: Set) -> Result<LogicalPlan> {
1784        match statement {
1785            Set::SingleAssignment {
1786                scope,
1787                hivevar,
1788                variable,
1789                values,
1790            } => {
1791                if scope.is_some() {
1792                    return not_impl_err!("SET with scope modifiers is not supported");
1793                }
1794
1795                if hivevar {
1796                    return not_impl_err!("SET HIVEVAR is not supported");
1797                }
1798
1799                let variable = object_name_to_string(&variable);
1800                let mut variable_lower = variable.to_lowercase();
1801
1802                if variable_lower == "timezone" || variable_lower == "time.zone" {
1803                    variable_lower = "datafusion.execution.time_zone".to_string();
1804                }
1805
1806                if values.len() != 1 {
1807                    return plan_err!("SET only supports single value assignment");
1808                }
1809
1810                let value_string = match &values[0] {
1811                    SQLExpr::Identifier(i) => ident_to_string(i),
1812                    SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
1813                        None => {
1814                            return plan_err!("Unsupported value {:?}", v.value);
1815                        }
1816                        Some(s) => s,
1817                    },
1818                    SQLExpr::UnaryOp { op, expr } => match op {
1819                        UnaryOperator::Plus => format!("+{expr}"),
1820                        UnaryOperator::Minus => format!("-{expr}"),
1821                        _ => return plan_err!("Unsupported unary op {:?}", op),
1822                    },
1823                    _ => return plan_err!("Unsupported expr {:?}", values[0]),
1824                };
1825
1826                Ok(LogicalPlan::Statement(PlanStatement::SetVariable(
1827                    SetVariable {
1828                        variable: variable_lower,
1829                        value: value_string,
1830                    },
1831                )))
1832            }
1833            other => not_impl_err!("SET variant not implemented yet: {other:?}"),
1834        }
1835    }
1836
1837    fn delete_to_plan(
1838        &self,
1839        table_name: ObjectName,
1840        predicate_expr: Option<SQLExpr>,
1841    ) -> Result<LogicalPlan> {
1842        // Do a table lookup to verify the table exists
1843        let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1844        let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1845        let schema = DFSchema::try_from_qualified_schema(
1846            table_ref.clone(),
1847            &table_source.schema(),
1848        )?;
1849        let scan =
1850            LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1851                .build()?;
1852        let mut planner_context = PlannerContext::new();
1853
1854        let source = match predicate_expr {
1855            None => scan,
1856            Some(predicate_expr) => {
1857                let filter_expr =
1858                    self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1859                let schema = Arc::new(schema);
1860                let mut using_columns = HashSet::new();
1861                expr_to_columns(&filter_expr, &mut using_columns)?;
1862                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1863                    filter_expr,
1864                    &[&[&schema]],
1865                    &[using_columns],
1866                )?;
1867                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1868            }
1869        };
1870
1871        let plan = LogicalPlan::Dml(DmlStatement::new(
1872            table_ref,
1873            table_source,
1874            WriteOp::Delete,
1875            Arc::new(source),
1876        ));
1877        Ok(plan)
1878    }
1879
1880    fn update_to_plan(
1881        &self,
1882        table: TableWithJoins,
1883        assignments: Vec<Assignment>,
1884        from: Option<TableWithJoins>,
1885        predicate_expr: Option<SQLExpr>,
1886    ) -> Result<LogicalPlan> {
1887        let (table_name, table_alias) = match &table.relation {
1888            TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
1889            _ => plan_err!("Cannot update non-table relation!")?,
1890        };
1891
1892        // Do a table lookup to verify the table exists
1893        let table_name = self.object_name_to_table_reference(table_name)?;
1894        let table_source = self.context_provider.get_table_source(table_name.clone())?;
1895        let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
1896            table_name.clone(),
1897            &table_source.schema(),
1898        )?);
1899
1900        // Overwrite with assignment expressions
1901        let mut planner_context = PlannerContext::new();
1902        let mut assign_map = assignments
1903            .iter()
1904            .map(|assign| {
1905                let cols = match &assign.target {
1906                    AssignmentTarget::ColumnName(cols) => cols,
1907                    _ => plan_err!("Tuples are not supported")?,
1908                };
1909                let col_name: &Ident = cols
1910                    .0
1911                    .iter()
1912                    .last()
1913                    .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
1914                    .as_ident()
1915                    .unwrap();
1916                // Validate that the assignment target column exists
1917                table_schema.field_with_unqualified_name(&col_name.value)?;
1918                Ok((col_name.value.clone(), assign.value.clone()))
1919            })
1920            .collect::<Result<HashMap<String, SQLExpr>>>()?;
1921
1922        // Build scan, join with from table if it exists.
1923        let mut input_tables = vec![table];
1924        input_tables.extend(from);
1925        let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
1926
1927        // Filter
1928        let source = match predicate_expr {
1929            None => scan,
1930            Some(predicate_expr) => {
1931                let filter_expr = self.sql_to_expr(
1932                    predicate_expr,
1933                    scan.schema(),
1934                    &mut planner_context,
1935                )?;
1936                let mut using_columns = HashSet::new();
1937                expr_to_columns(&filter_expr, &mut using_columns)?;
1938                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1939                    filter_expr,
1940                    &[&[scan.schema()]],
1941                    &[using_columns],
1942                )?;
1943                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1944            }
1945        };
1946
1947        // Build updated values for each column, using the previous value if not modified
1948        let exprs = table_schema
1949            .iter()
1950            .map(|(qualifier, field)| {
1951                let expr = match assign_map.remove(field.name()) {
1952                    Some(new_value) => {
1953                        let mut expr = self.sql_to_expr(
1954                            new_value,
1955                            source.schema(),
1956                            &mut planner_context,
1957                        )?;
1958                        // Update placeholder's datatype to the type of the target column
1959                        if let Expr::Placeholder(placeholder) = &mut expr {
1960                            placeholder.data_type = placeholder
1961                                .data_type
1962                                .take()
1963                                .or_else(|| Some(field.data_type().clone()));
1964                        }
1965                        // Cast to target column type, if necessary
1966                        expr.cast_to(field.data_type(), source.schema())?
1967                    }
1968                    None => {
1969                        // If the target table has an alias, use it to qualify the column name
1970                        if let Some(alias) = &table_alias {
1971                            Expr::Column(Column::new(
1972                                Some(self.ident_normalizer.normalize(alias.name.clone())),
1973                                field.name(),
1974                            ))
1975                        } else {
1976                            Expr::Column(Column::from((qualifier, field)))
1977                        }
1978                    }
1979                };
1980                Ok(expr.alias(field.name()))
1981            })
1982            .collect::<Result<Vec<_>>>()?;
1983
1984        let source = project(source, exprs)?;
1985
1986        let plan = LogicalPlan::Dml(DmlStatement::new(
1987            table_name,
1988            table_source,
1989            WriteOp::Update,
1990            Arc::new(source),
1991        ));
1992        Ok(plan)
1993    }
1994
1995    fn insert_to_plan(
1996        &self,
1997        table_name: ObjectName,
1998        columns: Vec<Ident>,
1999        source: Box<Query>,
2000        overwrite: bool,
2001        replace_into: bool,
2002    ) -> Result<LogicalPlan> {
2003        // Do a table lookup to verify the table exists
2004        let table_name = self.object_name_to_table_reference(table_name)?;
2005        let table_source = self.context_provider.get_table_source(table_name.clone())?;
2006        let table_schema = DFSchema::try_from(table_source.schema())?;
2007
2008        // Get insert fields and target table's value indices
2009        //
2010        // If value_indices[i] = Some(j), it means that the value of the i-th target table's column is
2011        // derived from the j-th output of the source.
2012        //
2013        // If value_indices[i] = None, it means that the value of the i-th target table's column is
2014        // not provided, and should be filled with a default value later.
2015        let (fields, value_indices) = if columns.is_empty() {
2016            // Empty means we're inserting into all columns of the table
2017            (
2018                table_schema.fields().clone(),
2019                (0..table_schema.fields().len())
2020                    .map(Some)
2021                    .collect::<Vec<_>>(),
2022            )
2023        } else {
2024            let mut value_indices = vec![None; table_schema.fields().len()];
2025            let fields = columns
2026                .into_iter()
2027                .map(|c| self.ident_normalizer.normalize(c))
2028                .enumerate()
2029                .map(|(i, c)| {
2030                    let column_index = table_schema
2031                        .index_of_column_by_name(None, &c)
2032                        .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
2033
2034                    if value_indices[column_index].is_some() {
2035                        return schema_err!(SchemaError::DuplicateUnqualifiedField {
2036                            name: c,
2037                        });
2038                    } else {
2039                        value_indices[column_index] = Some(i);
2040                    }
2041                    Ok(table_schema.field(column_index).clone())
2042                })
2043                .collect::<Result<Vec<_>>>()?;
2044            (Fields::from(fields), value_indices)
2045        };
2046
2047        // infer types for Values clause... other types should be resolvable the regular way
2048        let mut prepare_param_data_types = BTreeMap::new();
2049        if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2050            for row in rows.iter() {
2051                for (idx, val) in row.iter().enumerate() {
2052                    if let SQLExpr::Value(ValueWithSpan {
2053                        value: Value::Placeholder(name),
2054                        span: _,
2055                    }) = val
2056                    {
2057                        let name =
2058                            name.replace('$', "").parse::<usize>().map_err(|_| {
2059                                plan_datafusion_err!("Can't parse placeholder: {name}")
2060                            })? - 1;
2061                        let field = fields.get(idx).ok_or_else(|| {
2062                            plan_datafusion_err!(
2063                                "Placeholder ${} refers to a non existent column",
2064                                idx + 1
2065                            )
2066                        })?;
2067                        let dt = field.data_type().clone();
2068                        let _ = prepare_param_data_types.insert(name, dt);
2069                    }
2070                }
2071            }
2072        }
2073        let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2074
2075        // Projection
2076        let mut planner_context =
2077            PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2078        planner_context.set_table_schema(Some(DFSchemaRef::new(
2079            DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2080        )));
2081        let source = self.query_to_plan(*source, &mut planner_context)?;
2082        if fields.len() != source.schema().fields().len() {
2083            plan_err!("Column count doesn't match insert query!")?;
2084        }
2085
2086        let exprs = value_indices
2087            .into_iter()
2088            .enumerate()
2089            .map(|(i, value_index)| {
2090                let target_field = table_schema.field(i);
2091                let expr = match value_index {
2092                    Some(v) => {
2093                        Expr::Column(Column::from(source.schema().qualified_field(v)))
2094                            .cast_to(target_field.data_type(), source.schema())?
2095                    }
2096                    // The value is not specified. Fill in the default value for the column.
2097                    None => table_source
2098                        .get_column_default(target_field.name())
2099                        .cloned()
2100                        .unwrap_or_else(|| {
2101                            // If there is no default for the column, then the default is NULL
2102                            Expr::Literal(ScalarValue::Null, None)
2103                        })
2104                        .cast_to(target_field.data_type(), &DFSchema::empty())?,
2105                };
2106                Ok(expr.alias(target_field.name()))
2107            })
2108            .collect::<Result<Vec<Expr>>>()?;
2109        let source = project(source, exprs)?;
2110
2111        let insert_op = match (overwrite, replace_into) {
2112            (false, false) => InsertOp::Append,
2113            (true, false) => InsertOp::Overwrite,
2114            (false, true) => InsertOp::Replace,
2115            (true, true) => plan_err!("Conflicting insert operations: `overwrite` and `replace_into` cannot both be true")?,
2116        };
2117
2118        let plan = LogicalPlan::Dml(DmlStatement::new(
2119            table_name,
2120            Arc::clone(&table_source),
2121            WriteOp::Insert(insert_op),
2122            Arc::new(source),
2123        ));
2124        Ok(plan)
2125    }
2126
2127    fn show_columns_to_plan(
2128        &self,
2129        extended: bool,
2130        full: bool,
2131        sql_table_name: ObjectName,
2132    ) -> Result<LogicalPlan> {
2133        // Figure out the where clause
2134        let where_clause = object_name_to_qualifier(
2135            &sql_table_name,
2136            self.options.enable_ident_normalization,
2137        )?;
2138
2139        if !self.has_table("information_schema", "columns") {
2140            return plan_err!(
2141                "SHOW COLUMNS is not supported unless information_schema is enabled"
2142            );
2143        }
2144
2145        // Do a table lookup to verify the table exists
2146        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2147        let _ = self.context_provider.get_table_source(table_ref)?;
2148
2149        // Treat both FULL and EXTENDED as the same
2150        let select_list = if full || extended {
2151            "*"
2152        } else {
2153            "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2154        };
2155
2156        let query = format!(
2157            "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2158        );
2159
2160        let mut rewrite = DFParser::parse_sql(&query)?;
2161        assert_eq!(rewrite.len(), 1);
2162        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2163    }
2164
2165    /// Rewrite `SHOW FUNCTIONS` to another SQL query
2166    /// The query is based on the `information_schema.routines` and `information_schema.parameters` tables
2167    ///
2168    /// The output columns:
2169    /// - function_name: The name of function
2170    /// - return_type: The return type of the function
2171    /// - parameters: The name of parameters (ordered by the ordinal position)
2172    /// - parameter_types: The type of parameters (ordered by the ordinal position)
2173    /// - description: The description of the function (the description defined in the document)
2174    /// - syntax_example: The syntax_example of the function (the syntax_example defined in the document)
2175    fn show_functions_to_plan(
2176        &self,
2177        filter: Option<ShowStatementFilter>,
2178    ) -> Result<LogicalPlan> {
2179        let where_clause = if let Some(filter) = filter {
2180            match filter {
2181                ShowStatementFilter::Like(like) => {
2182                    format!("WHERE p.function_name like '{like}'")
2183                }
2184                _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2185            }
2186        } else {
2187            "".to_string()
2188        };
2189
2190        let query = format!(
2191            r#"
2192SELECT DISTINCT
2193    p.*,
2194    r.function_type function_type,
2195    r.description description,
2196    r.syntax_example syntax_example
2197FROM
2198    (
2199        SELECT
2200            i.specific_name function_name,
2201            o.data_type return_type,
2202            array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2203            array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2204        FROM (
2205                 SELECT
2206                     specific_catalog,
2207                     specific_schema,
2208                     specific_name,
2209                     ordinal_position,
2210                     parameter_name,
2211                     data_type,
2212                     rid
2213                 FROM
2214                     information_schema.parameters
2215                 WHERE
2216                     parameter_mode = 'IN'
2217             ) i
2218                 JOIN
2219             (
2220                 SELECT
2221                     specific_catalog,
2222                     specific_schema,
2223                     specific_name,
2224                     ordinal_position,
2225                     parameter_name,
2226                     data_type,
2227                     rid
2228                 FROM
2229                     information_schema.parameters
2230                 WHERE
2231                     parameter_mode = 'OUT'
2232             ) o
2233             ON i.specific_catalog = o.specific_catalog
2234                 AND i.specific_schema = o.specific_schema
2235                 AND i.specific_name = o.specific_name
2236                 AND i.rid = o.rid
2237        GROUP BY 1, 2, i.rid
2238    ) as p
2239JOIN information_schema.routines r
2240ON p.function_name = r.routine_name
2241{where_clause}
2242            "#
2243        );
2244        let mut rewrite = DFParser::parse_sql(&query)?;
2245        assert_eq!(rewrite.len(), 1);
2246        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2247    }
2248
2249    fn show_create_table_to_plan(
2250        &self,
2251        sql_table_name: ObjectName,
2252    ) -> Result<LogicalPlan> {
2253        if !self.has_table("information_schema", "tables") {
2254            return plan_err!(
2255                "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2256            );
2257        }
2258        // Figure out the where clause
2259        let where_clause = object_name_to_qualifier(
2260            &sql_table_name,
2261            self.options.enable_ident_normalization,
2262        )?;
2263
2264        // Do a table lookup to verify the table exists
2265        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2266        let _ = self.context_provider.get_table_source(table_ref)?;
2267
2268        let query = format!(
2269            "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2270        );
2271
2272        let mut rewrite = DFParser::parse_sql(&query)?;
2273        assert_eq!(rewrite.len(), 1);
2274        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2275    }
2276
2277    /// Return true if there is a table provider available for "schema.table"
2278    fn has_table(&self, schema: &str, table: &str) -> bool {
2279        let tables_reference = TableReference::Partial {
2280            schema: schema.into(),
2281            table: table.into(),
2282        };
2283        self.context_provider
2284            .get_table_source(tables_reference)
2285            .is_ok()
2286    }
2287
2288    fn validate_transaction_kind(
2289        &self,
2290        kind: Option<BeginTransactionKind>,
2291    ) -> Result<()> {
2292        match kind {
2293            // BEGIN
2294            None => Ok(()),
2295            // BEGIN TRANSACTION
2296            Some(BeginTransactionKind::Transaction) => Ok(()),
2297            Some(BeginTransactionKind::Work) => {
2298                not_impl_err!("Transaction kind not supported: {kind:?}")
2299            }
2300        }
2301    }
2302}