Skip to main content

datafusion_sql/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24    CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25    LexOrdering, ResetStatement, Statement as DFStatement,
26};
27use crate::planner::{
28    ContextProvider, PlannerContext, SqlToRel, object_name_to_qualifier,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{Field, FieldRef, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36    Column, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result,
37    ScalarValue, SchemaError, SchemaReference, TableReference, ToDFSchema, exec_err,
38    internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
39    unqualified_field_not_found,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::DdlStatement;
44use datafusion_expr::logical_plan::builder::project;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47    Analyze, CreateCatalog, CreateCatalogSchema,
48    CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49    CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50    DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51    EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52    LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare,
53    ResetVariable, SetVariable, SortExpr, Statement as PlanStatement, ToStringifiedPlan,
54    TransactionAccessMode, TransactionConclusion, TransactionEnd,
55    TransactionIsolationLevel, TransactionStart, Volatility, WriteOp, cast, col,
56};
57use sqlparser::ast::{
58    self, BeginTransactionKind, CheckConstraint, ForeignKeyConstraint, IndexColumn,
59    IndexType, NullsDistinctOption, OrderByExpr, OrderByOptions, PrimaryKeyConstraint,
60    Set, ShowStatementIn, ShowStatementOptions, SqliteOnConflict, TableObject,
61    UniqueConstraint, Update, UpdateTableFromKind, ValueWithSpan,
62};
63use sqlparser::ast::{
64    Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
65    CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
66    ObjectName, ObjectType, Query, SchemaName, SetExpr, ShowCreateObject,
67    ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
68    TransactionMode, UnaryOperator, Value,
69};
70use sqlparser::parser::ParserError::ParserError;
71
72fn ident_to_string(ident: &Ident) -> String {
73    normalize_ident(ident.to_owned())
74}
75
76fn object_name_to_string(object_name: &ObjectName) -> String {
77    object_name
78        .0
79        .iter()
80        .map(|object_name_part| {
81            object_name_part
82                .as_ident()
83                // TODO: It might be better to return an error
84                // than to silently use a default value.
85                .map_or_else(String::new, ident_to_string)
86        })
87        .collect::<Vec<String>>()
88        .join(".")
89}
90
91fn get_schema_name(schema_name: &SchemaName) -> String {
92    match schema_name {
93        SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
94        SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
95        SchemaName::NamedAuthorization(schema_name, auth) => format!(
96            "{}.{}",
97            object_name_to_string(schema_name),
98            ident_to_string(auth)
99        ),
100    }
101}
102
103/// Construct `TableConstraint`(s) for the given columns by iterating over
104/// `columns` and extracting individual inline constraint definitions.
105fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
106    let mut constraints: Vec<TableConstraint> = vec![];
107    for column in columns {
108        for ast::ColumnOptionDef { name, option } in &column.options {
109            match option {
110                ast::ColumnOption::Unique(UniqueConstraint {
111                    characteristics,
112                    name,
113                    index_name: _index_name,
114                    index_type_display: _index_type_display,
115                    index_type: _index_type,
116                    columns: _column,
117                    index_options: _index_options,
118                    nulls_distinct: _nulls_distinct,
119                }) => constraints.push(TableConstraint::Unique(UniqueConstraint {
120                    name: name.clone(),
121                    index_name: None,
122                    index_type_display: ast::KeyOrIndexDisplay::None,
123                    index_type: None,
124                    columns: vec![IndexColumn {
125                        column: OrderByExpr {
126                            expr: SQLExpr::Identifier(column.name.clone()),
127                            options: OrderByOptions {
128                                asc: None,
129                                nulls_first: None,
130                            },
131                            with_fill: None,
132                        },
133                        operator_class: None,
134                    }],
135                    index_options: vec![],
136                    characteristics: *characteristics,
137                    nulls_distinct: NullsDistinctOption::None,
138                })),
139                ast::ColumnOption::PrimaryKey(PrimaryKeyConstraint {
140                    characteristics,
141                    name: _name,
142                    index_name: _index_name,
143                    index_type: _index_type,
144                    columns: _columns,
145                    index_options: _index_options,
146                }) => {
147                    constraints.push(TableConstraint::PrimaryKey(PrimaryKeyConstraint {
148                        name: name.clone(),
149                        index_name: None,
150                        index_type: None,
151                        columns: vec![IndexColumn {
152                            column: OrderByExpr {
153                                expr: SQLExpr::Identifier(column.name.clone()),
154                                options: OrderByOptions {
155                                    asc: None,
156                                    nulls_first: None,
157                                },
158                                with_fill: None,
159                            },
160                            operator_class: None,
161                        }],
162                        index_options: vec![],
163                        characteristics: *characteristics,
164                    }))
165                }
166                ast::ColumnOption::ForeignKey(ForeignKeyConstraint {
167                    foreign_table,
168                    referred_columns,
169                    on_delete,
170                    on_update,
171                    characteristics,
172                    name: _name,
173                    index_name: _index_name,
174                    columns: _columns,
175                    match_kind: _match_kind,
176                }) => {
177                    constraints.push(TableConstraint::ForeignKey(ForeignKeyConstraint {
178                        name: name.clone(),
179                        index_name: None,
180                        columns: vec![],
181                        foreign_table: foreign_table.clone(),
182                        referred_columns: referred_columns.clone(),
183                        on_delete: *on_delete,
184                        on_update: *on_update,
185                        match_kind: None,
186                        characteristics: *characteristics,
187                    }))
188                }
189                ast::ColumnOption::Check(CheckConstraint {
190                    name,
191                    expr,
192                    enforced: _enforced,
193                }) => constraints.push(TableConstraint::Check(CheckConstraint {
194                    name: name.clone(),
195                    expr: expr.clone(),
196                    enforced: None,
197                })),
198                ast::ColumnOption::Default(_)
199                | ast::ColumnOption::Null
200                | ast::ColumnOption::NotNull
201                | ast::ColumnOption::DialectSpecific(_)
202                | ast::ColumnOption::CharacterSet(_)
203                | ast::ColumnOption::Generated { .. }
204                | ast::ColumnOption::Comment(_)
205                | ast::ColumnOption::Options(_)
206                | ast::ColumnOption::OnUpdate(_)
207                | ast::ColumnOption::Materialized(_)
208                | ast::ColumnOption::Ephemeral(_)
209                | ast::ColumnOption::Identity(_)
210                | ast::ColumnOption::OnConflict(_)
211                | ast::ColumnOption::Policy(_)
212                | ast::ColumnOption::Tags(_)
213                | ast::ColumnOption::Alias(_)
214                | ast::ColumnOption::Srid(_)
215                | ast::ColumnOption::Collation(_)
216                | ast::ColumnOption::Invisible => {}
217            }
218        }
219    }
220    constraints
221}
222
223impl<S: ContextProvider> SqlToRel<'_, S> {
224    /// Generate a logical plan from an DataFusion SQL statement
225    pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
226        match statement {
227            DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
228            DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
229            DFStatement::CopyTo(s) => self.copy_to_plan(s),
230            DFStatement::Explain(ExplainStatement {
231                verbose,
232                analyze,
233                format,
234                statement,
235            }) => self.explain_to_plan(verbose, analyze, format, *statement),
236            DFStatement::Reset(statement) => self.reset_statement_to_plan(statement),
237        }
238    }
239
240    /// Generate a logical plan from an SQL statement
241    pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
242        self.sql_statement_to_plan_with_context_impl(
243            statement,
244            &mut PlannerContext::new(),
245        )
246    }
247
248    /// Generate a logical plan from an SQL statement
249    pub fn sql_statement_to_plan_with_context(
250        &self,
251        statement: Statement,
252        planner_context: &mut PlannerContext,
253    ) -> Result<LogicalPlan> {
254        self.sql_statement_to_plan_with_context_impl(statement, planner_context)
255    }
256
257    fn sql_statement_to_plan_with_context_impl(
258        &self,
259        statement: Statement,
260        planner_context: &mut PlannerContext,
261    ) -> Result<LogicalPlan> {
262        match statement {
263            Statement::ExplainTable {
264                describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, // only parse 'DESCRIBE table_name' or 'DESC table_name' and not 'EXPLAIN table_name'
265                table_name,
266                ..
267            } => self.describe_table_to_plan(table_name),
268            Statement::Explain {
269                describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, // only parse 'DESCRIBE statement' or 'DESC statement' and not 'EXPLAIN statement'
270                statement,
271                ..
272            } => match *statement {
273                Statement::Query(query) => self.describe_query_to_plan(*query),
274                _ => {
275                    not_impl_err!("Describing statements other than SELECT not supported")
276                }
277            },
278            Statement::Explain {
279                verbose,
280                statement,
281                analyze,
282                format,
283                describe_alias: _,
284                ..
285            } => {
286                let format = format.map(|format| format.to_string());
287                let statement = DFStatement::Statement(statement);
288                self.explain_to_plan(verbose, analyze, format, statement)
289            }
290            Statement::Query(query) => self.query_to_plan(*query, planner_context),
291            Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
292            Statement::Set(statement) => self.set_statement_to_plan(statement),
293            Statement::CreateTable(CreateTable {
294                temporary,
295                external,
296                global,
297                transient,
298                volatile,
299                hive_distribution,
300                hive_formats,
301                file_format,
302                location,
303                query,
304                name,
305                columns,
306                constraints,
307                if_not_exists,
308                or_replace,
309                without_rowid,
310                like,
311                clone,
312                comment,
313                on_commit,
314                on_cluster,
315                primary_key,
316                order_by,
317                partition_by,
318                cluster_by,
319                clustered_by,
320                strict,
321                copy_grants,
322                enable_schema_evolution,
323                change_tracking,
324                data_retention_time_in_days,
325                max_data_extension_time_in_days,
326                default_ddl_collation,
327                with_aggregation_policy,
328                with_row_access_policy,
329                with_tags,
330                iceberg,
331                external_volume,
332                base_location,
333                catalog,
334                catalog_sync,
335                storage_serialization_policy,
336                inherits,
337                table_options: CreateTableOptions::None,
338                dynamic,
339                version,
340                target_lag,
341                warehouse,
342                refresh_mode,
343                initialize,
344                require_user,
345                partition_of,
346                for_values,
347                snapshot,
348                with_storage_lifecycle_policy,
349                diststyle,
350                distkey,
351                sortkey,
352                backup,
353            }) => {
354                if temporary {
355                    return not_impl_err!("Temporary tables not supported");
356                }
357                if external {
358                    return not_impl_err!("External tables not supported");
359                }
360                if global.is_some() {
361                    return not_impl_err!("Global tables not supported");
362                }
363                if transient {
364                    return not_impl_err!("Transient tables not supported");
365                }
366                if volatile {
367                    return not_impl_err!("Volatile tables not supported");
368                }
369                if hive_distribution != ast::HiveDistributionStyle::NONE {
370                    return not_impl_err!(
371                        "Hive distribution not supported: {hive_distribution:?}"
372                    );
373                }
374                if hive_formats.is_some()
375                    && !matches!(
376                        hive_formats,
377                        Some(ast::HiveFormat {
378                            row_format: None,
379                            serde_properties: None,
380                            storage: None,
381                            location: None,
382                        })
383                    )
384                {
385                    return not_impl_err!("Hive formats not supported: {hive_formats:?}");
386                }
387                if file_format.is_some() {
388                    return not_impl_err!("File format not supported");
389                }
390                if location.is_some() {
391                    return not_impl_err!("Location not supported");
392                }
393                if without_rowid {
394                    return not_impl_err!("Without rowid not supported");
395                }
396                if like.is_some() {
397                    return not_impl_err!("Like not supported");
398                }
399                if clone.is_some() {
400                    return not_impl_err!("Clone not supported");
401                }
402                if comment.is_some() {
403                    return not_impl_err!("Comment not supported");
404                }
405                if on_commit.is_some() {
406                    return not_impl_err!("On commit not supported");
407                }
408                if on_cluster.is_some() {
409                    return not_impl_err!("On cluster not supported");
410                }
411                if primary_key.is_some() {
412                    return not_impl_err!("Primary key not supported");
413                }
414                if order_by.is_some() {
415                    return not_impl_err!("Order by not supported");
416                }
417                if partition_by.is_some() {
418                    return not_impl_err!("Partition by not supported");
419                }
420                if cluster_by.is_some() {
421                    return not_impl_err!("Cluster by not supported");
422                }
423                if clustered_by.is_some() {
424                    return not_impl_err!("Clustered by not supported");
425                }
426                if strict {
427                    return not_impl_err!("Strict not supported");
428                }
429                if copy_grants {
430                    return not_impl_err!("Copy grants not supported");
431                }
432                if enable_schema_evolution.is_some() {
433                    return not_impl_err!("Enable schema evolution not supported");
434                }
435                if change_tracking.is_some() {
436                    return not_impl_err!("Change tracking not supported");
437                }
438                if data_retention_time_in_days.is_some() {
439                    return not_impl_err!("Data retention time in days not supported");
440                }
441                if max_data_extension_time_in_days.is_some() {
442                    return not_impl_err!(
443                        "Max data extension time in days not supported"
444                    );
445                }
446                if default_ddl_collation.is_some() {
447                    return not_impl_err!("Default DDL collation not supported");
448                }
449                if with_aggregation_policy.is_some() {
450                    return not_impl_err!("With aggregation policy not supported");
451                }
452                if with_row_access_policy.is_some() {
453                    return not_impl_err!("With row access policy not supported");
454                }
455                if with_tags.is_some() {
456                    return not_impl_err!("With tags not supported");
457                }
458                if iceberg {
459                    return not_impl_err!("Iceberg not supported");
460                }
461                if external_volume.is_some() {
462                    return not_impl_err!("External volume not supported");
463                }
464                if base_location.is_some() {
465                    return not_impl_err!("Base location not supported");
466                }
467                if catalog.is_some() {
468                    return not_impl_err!("Catalog not supported");
469                }
470                if catalog_sync.is_some() {
471                    return not_impl_err!("Catalog sync not supported");
472                }
473                if storage_serialization_policy.is_some() {
474                    return not_impl_err!("Storage serialization policy not supported");
475                }
476                if inherits.is_some() {
477                    return not_impl_err!("Table inheritance not supported");
478                }
479                if dynamic {
480                    return not_impl_err!("Dynamic tables not supported");
481                }
482                if version.is_some() {
483                    return not_impl_err!("Version not supported");
484                }
485                if target_lag.is_some() {
486                    return not_impl_err!("Target lag not supported");
487                }
488                if warehouse.is_some() {
489                    return not_impl_err!("Warehouse not supported");
490                }
491                if refresh_mode.is_some() {
492                    return not_impl_err!("Refresh mode not supported");
493                }
494                if initialize.is_some() {
495                    return not_impl_err!("Initialize not supported");
496                }
497                if require_user {
498                    return not_impl_err!("Require user not supported");
499                }
500                if partition_of.is_some() {
501                    return not_impl_err!("PARTITION OF not supported");
502                }
503                if for_values.is_some() {
504                    return not_impl_err!("PARTITION OF .. FOR VALUES .. not supported");
505                }
506                if snapshot {
507                    return not_impl_err!("Snapshot tables not supported");
508                }
509                if with_storage_lifecycle_policy.is_some() {
510                    return not_impl_err!("WITH STORAGE LIFECYCLE POLICY not supported");
511                }
512                if diststyle.is_some() {
513                    return not_impl_err!("DISTSTYLE not supported");
514                }
515                if distkey.is_some() {
516                    return not_impl_err!("DISTKEY not supported");
517                }
518                if sortkey.is_some() {
519                    return not_impl_err!("SORTKEY not supported");
520                }
521                if backup.is_some() {
522                    return not_impl_err!("BACKUP not supported");
523                }
524                // Merge inline constraints and existing constraints
525                let mut all_constraints = constraints;
526                let inline_constraints = calc_inline_constraints_from_columns(&columns);
527                all_constraints.extend(inline_constraints);
528                // Build column default values
529                let column_defaults =
530                    self.build_column_defaults(&columns, planner_context)?;
531
532                let has_columns = !columns.is_empty();
533                let schema = self.build_schema(columns)?.to_dfschema_ref()?;
534                if has_columns {
535                    planner_context.set_table_schema(Some(Arc::clone(&schema)));
536                }
537
538                match query {
539                    Some(query) => {
540                        let plan = self.query_to_plan(*query, planner_context)?;
541                        let input_schema = plan.schema();
542
543                        let plan = if has_columns {
544                            if schema.fields().len() != input_schema.fields().len() {
545                                return plan_err!(
546                                    "Mismatch: {} columns specified, but result has {} columns",
547                                    schema.fields().len(),
548                                    input_schema.fields().len()
549                                );
550                            }
551                            let input_fields = input_schema.fields();
552                            let project_exprs = schema
553                                .fields()
554                                .iter()
555                                .zip(input_fields)
556                                .map(|(field, input_field)| {
557                                    cast(
558                                        col(input_field.name()),
559                                        field.data_type().clone(),
560                                    )
561                                    .alias(field.name())
562                                })
563                                .collect::<Vec<_>>();
564
565                            LogicalPlanBuilder::from(plan.clone())
566                                .project(project_exprs)?
567                                .build()?
568                        } else {
569                            plan
570                        };
571
572                        let constraints = self.new_constraint_from_table_constraints(
573                            &all_constraints,
574                            plan.schema(),
575                        )?;
576
577                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
578                            CreateMemoryTable {
579                                name: self.object_name_to_table_reference(name)?,
580                                constraints,
581                                input: Arc::new(plan),
582                                if_not_exists,
583                                or_replace,
584                                column_defaults,
585                                temporary,
586                            },
587                        )))
588                    }
589
590                    None => {
591                        let plan = EmptyRelation {
592                            produce_one_row: false,
593                            schema,
594                        };
595                        let plan = LogicalPlan::EmptyRelation(plan);
596                        let constraints = self.new_constraint_from_table_constraints(
597                            &all_constraints,
598                            plan.schema(),
599                        )?;
600                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
601                            CreateMemoryTable {
602                                name: self.object_name_to_table_reference(name)?,
603                                constraints,
604                                input: Arc::new(plan),
605                                if_not_exists,
606                                or_replace,
607                                column_defaults,
608                                temporary,
609                            },
610                        )))
611                    }
612                }
613            }
614            Statement::CreateView(ast::CreateView {
615                or_replace,
616                materialized,
617                name,
618                columns,
619                query,
620                options: CreateTableOptions::None,
621                cluster_by,
622                comment,
623                with_no_schema_binding,
624                if_not_exists,
625                temporary,
626                to,
627                params,
628                or_alter,
629                secure,
630                name_before_not_exists,
631                copy_grants,
632            }) => {
633                if materialized {
634                    return not_impl_err!("Materialized views not supported")?;
635                }
636                if !cluster_by.is_empty() {
637                    return not_impl_err!("Cluster by not supported")?;
638                }
639                if comment.is_some() {
640                    return not_impl_err!("Comment not supported")?;
641                }
642                if with_no_schema_binding {
643                    return not_impl_err!("With no schema binding not supported")?;
644                }
645                if if_not_exists {
646                    return not_impl_err!("If not exists not supported")?;
647                }
648                if to.is_some() {
649                    return not_impl_err!("To not supported")?;
650                }
651                if copy_grants {
652                    return not_impl_err!("COPY GRANTS not supported")?;
653                }
654
655                // put the statement back together temporarily to get the SQL
656                // string representation
657                let stmt = Statement::CreateView(ast::CreateView {
658                    or_replace,
659                    materialized,
660                    name,
661                    columns,
662                    query,
663                    options: CreateTableOptions::None,
664                    cluster_by,
665                    comment,
666                    with_no_schema_binding,
667                    if_not_exists,
668                    temporary,
669                    to,
670                    params,
671                    or_alter,
672                    secure,
673                    name_before_not_exists,
674                    copy_grants,
675                });
676                let sql = stmt.to_string();
677                let Statement::CreateView(ast::CreateView {
678                    name,
679                    columns,
680                    query,
681                    or_replace,
682                    temporary,
683                    ..
684                }) = stmt
685                else {
686                    return internal_err!("Unreachable code in create view");
687                };
688
689                let columns = columns
690                    .into_iter()
691                    .map(|view_column_def| {
692                        if let Some(options) = view_column_def.options {
693                            plan_err!(
694                                "Options not supported for view columns: {options:?}"
695                            )
696                        } else {
697                            Ok(view_column_def.name)
698                        }
699                    })
700                    .collect::<Result<Vec<_>>>()?;
701
702                let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
703                plan = self.apply_expr_alias(plan, columns)?;
704
705                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
706                    name: self.object_name_to_table_reference(name)?,
707                    input: Arc::new(plan),
708                    or_replace,
709                    definition: Some(sql),
710                    temporary,
711                })))
712            }
713            Statement::ShowCreate { obj_type, obj_name } => match obj_type {
714                ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
715                _ => {
716                    not_impl_err!("Only `SHOW CREATE TABLE  ...` statement is supported")
717                }
718            },
719            Statement::CreateSchema {
720                schema_name,
721                if_not_exists,
722                ..
723            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
724                CreateCatalogSchema {
725                    schema_name: get_schema_name(&schema_name),
726                    if_not_exists,
727                    schema: Arc::new(DFSchema::empty()),
728                },
729            ))),
730            Statement::CreateDatabase {
731                db_name,
732                if_not_exists,
733                ..
734            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
735                CreateCatalog {
736                    catalog_name: object_name_to_string(&db_name),
737                    if_not_exists,
738                    schema: Arc::new(DFSchema::empty()),
739                },
740            ))),
741            Statement::Drop {
742                object_type,
743                if_exists,
744                mut names,
745                cascade,
746                restrict: _,
747                purge: _,
748                temporary: _,
749                table: _,
750            } => {
751                // We don't support cascade and purge for now.
752                // nor do we support multiple object names
753                let name = match names.len() {
754                    0 => Err(ParserError("Missing table name.".to_string()).into()),
755                    1 => self.object_name_to_table_reference(names.pop().unwrap()),
756                    _ => {
757                        Err(ParserError("Multiple objects not supported".to_string())
758                            .into())
759                    }
760                }?;
761
762                match object_type {
763                    ObjectType::Table => {
764                        Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
765                            name,
766                            if_exists,
767                            schema: DFSchemaRef::new(DFSchema::empty()),
768                        })))
769                    }
770                    ObjectType::View => {
771                        Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
772                            name,
773                            if_exists,
774                            schema: DFSchemaRef::new(DFSchema::empty()),
775                        })))
776                    }
777                    ObjectType::Schema => {
778                        let name = match name {
779                            TableReference::Bare { table } => {
780                                Ok(SchemaReference::Bare { schema: table })
781                            }
782                            TableReference::Partial { schema, table } => {
783                                Ok(SchemaReference::Full {
784                                    schema: table,
785                                    catalog: schema,
786                                })
787                            }
788                            TableReference::Full {
789                                catalog: _,
790                                schema: _,
791                                table: _,
792                            } => Err(ParserError(
793                                "Invalid schema specifier (has 3 parts)".to_string(),
794                            )),
795                        }?;
796                        Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(
797                            DropCatalogSchema {
798                                name,
799                                if_exists,
800                                cascade,
801                                schema: DFSchemaRef::new(DFSchema::empty()),
802                            },
803                        )))
804                    }
805                    _ => not_impl_err!(
806                        "Only `DROP TABLE/VIEW/SCHEMA  ...` statement is supported currently"
807                    ),
808                }
809            }
810            Statement::Prepare {
811                name,
812                data_types,
813                statement,
814            } => {
815                // Convert parser data types to DataFusion data types
816                let mut fields: Vec<FieldRef> = data_types
817                    .into_iter()
818                    .map(|t| self.convert_data_type_to_field(&t))
819                    .collect::<Result<_>>()?;
820
821                // Create planner context with parameters
822                let mut planner_context = PlannerContext::new()
823                    .with_prepare_param_data_types(
824                        fields.iter().cloned().map(Some).collect(),
825                    );
826
827                // Build logical plan for inner statement of the prepare statement
828                let plan = self.sql_statement_to_plan_with_context_impl(
829                    *statement,
830                    &mut planner_context,
831                )?;
832
833                if fields.is_empty() {
834                    let map_types = plan.get_parameter_fields()?;
835                    let param_types: Vec<_> = (1..=map_types.len())
836                        .filter_map(|i| {
837                            let key = format!("${i}");
838                            map_types.get(&key).and_then(|opt| opt.clone())
839                        })
840                        .collect();
841                    fields.extend(param_types.iter().cloned());
842                    planner_context.with_prepare_param_data_types(
843                        param_types.into_iter().map(Some).collect(),
844                    );
845                }
846
847                Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
848                    name: ident_to_string(&name),
849                    fields,
850                    input: Arc::new(plan),
851                })))
852            }
853            Statement::Execute {
854                name,
855                parameters,
856                using,
857                // has_parentheses specifies the syntax, but the plan is the
858                // same no matter the syntax used, so ignore it
859                has_parentheses: _,
860                immediate,
861                into,
862                output,
863                default,
864            } => {
865                // `USING` is a MySQL-specific syntax and currently not supported.
866                if !using.is_empty() {
867                    return not_impl_err!(
868                        "Execute statement with USING is not supported"
869                    );
870                }
871                if immediate {
872                    return not_impl_err!(
873                        "Execute statement with IMMEDIATE is not supported"
874                    );
875                }
876                if !into.is_empty() {
877                    return not_impl_err!("Execute statement with INTO is not supported");
878                }
879                if output {
880                    return not_impl_err!(
881                        "Execute statement with OUTPUT is not supported"
882                    );
883                }
884                if default {
885                    return not_impl_err!(
886                        "Execute statement with DEFAULT is not supported"
887                    );
888                }
889                let name = name.ok_or_else(|| {
890                    plan_datafusion_err!("EXECUTE statement requires a name")
891                })?;
892
893                let empty_schema = DFSchema::empty();
894                let parameters = parameters
895                    .into_iter()
896                    .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
897                    .collect::<Result<Vec<Expr>>>()?;
898
899                Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
900                    name: object_name_to_string(&name),
901                    parameters,
902                })))
903            }
904            Statement::Deallocate {
905                name,
906                // Similar to PostgreSQL, the PREPARE keyword is ignored
907                prepare: _,
908            } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
909                Deallocate {
910                    name: ident_to_string(&name),
911                },
912            ))),
913
914            Statement::ShowTables {
915                extended,
916                full,
917                terse,
918                history,
919                external,
920                show_options,
921            } => {
922                // We only support the basic "SHOW TABLES"
923                // https://github.com/apache/datafusion/issues/3188
924                if extended {
925                    return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
926                }
927                if full {
928                    return not_impl_err!("SHOW FULL TABLES not supported")?;
929                }
930                if terse {
931                    return not_impl_err!("SHOW TERSE TABLES not supported")?;
932                }
933                if history {
934                    return not_impl_err!("SHOW TABLES HISTORY not supported")?;
935                }
936                if external {
937                    return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
938                }
939                let ShowStatementOptions {
940                    show_in,
941                    starts_with,
942                    limit,
943                    limit_from,
944                    filter_position,
945                } = show_options;
946                if show_in.is_some() {
947                    return not_impl_err!("SHOW TABLES IN not supported")?;
948                }
949                if starts_with.is_some() {
950                    return not_impl_err!("SHOW TABLES LIKE not supported")?;
951                }
952                if limit.is_some() {
953                    return not_impl_err!("SHOW TABLES LIMIT not supported")?;
954                }
955                if limit_from.is_some() {
956                    return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
957                }
958                if filter_position.is_some() {
959                    return not_impl_err!("SHOW TABLES FILTER not supported")?;
960                }
961                self.show_tables_to_plan()
962            }
963
964            Statement::ShowColumns {
965                extended,
966                full,
967                show_options,
968            } => {
969                let ShowStatementOptions {
970                    show_in,
971                    starts_with,
972                    limit,
973                    limit_from,
974                    filter_position,
975                } = show_options;
976                if starts_with.is_some() {
977                    return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
978                }
979                if limit.is_some() {
980                    return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
981                }
982                if limit_from.is_some() {
983                    return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
984                }
985                if filter_position.is_some() {
986                    return not_impl_err!(
987                        "SHOW COLUMNS with WHERE or LIKE is not supported"
988                    )?;
989                }
990                let Some(ShowStatementIn {
991                    // specifies if the syntax was `SHOW COLUMNS IN` or `SHOW
992                    // COLUMNS FROM` which is not different in DataFusion
993                    clause: _,
994                    parent_type,
995                    parent_name,
996                }) = show_in
997                else {
998                    return plan_err!("SHOW COLUMNS requires a table name");
999                };
1000
1001                if let Some(parent_type) = parent_type {
1002                    return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
1003                }
1004                let Some(table_name) = parent_name else {
1005                    return plan_err!("SHOW COLUMNS requires a table name");
1006                };
1007
1008                self.show_columns_to_plan(extended, full, table_name)
1009            }
1010
1011            Statement::ShowFunctions { filter, .. } => {
1012                self.show_functions_to_plan(filter)
1013            }
1014
1015            Statement::Insert(Insert {
1016                or,
1017                into,
1018                columns,
1019                overwrite,
1020                source,
1021                partitioned,
1022                after_columns,
1023                table,
1024                on,
1025                returning,
1026                ignore,
1027                table_alias,
1028                mut replace_into,
1029                priority,
1030                insert_alias,
1031                assignments,
1032                has_table_keyword,
1033                settings,
1034                format_clause,
1035                insert_token: _, // record the location the `INSERT` token
1036                optimizer_hints,
1037                output,
1038                multi_table_insert_type,
1039                multi_table_into_clauses,
1040                multi_table_when_clauses,
1041                multi_table_else_clause,
1042            }) => {
1043                let table_name = match table {
1044                    TableObject::TableName(table_name) => table_name,
1045                    TableObject::TableFunction(_) => {
1046                        return not_impl_err!(
1047                            "INSERT INTO Table functions not supported"
1048                        );
1049                    }
1050                    TableObject::TableQuery(_) => {
1051                        return not_impl_err!(
1052                            "INSERT INTO subquery target not supported"
1053                        );
1054                    }
1055                };
1056                if let Some(or) = or {
1057                    match or {
1058                        SqliteOnConflict::Replace => replace_into = true,
1059                        _ => plan_err!("Inserts with {or} clause is not supported")?,
1060                    }
1061                }
1062                if partitioned.is_some() {
1063                    plan_err!("Partitioned inserts not yet supported")?;
1064                }
1065                if !after_columns.is_empty() {
1066                    plan_err!("After-columns clause not supported")?;
1067                }
1068                if on.is_some() {
1069                    plan_err!("Insert-on clause not supported")?;
1070                }
1071                if returning.is_some() {
1072                    plan_err!("Insert-returning clause not supported")?;
1073                }
1074                if ignore {
1075                    plan_err!("Insert-ignore clause not supported")?;
1076                }
1077                let Some(source) = source else {
1078                    plan_err!("Inserts without a source not supported")?
1079                };
1080                if let Some(table_alias) = table_alias {
1081                    plan_err!(
1082                        "Inserts with a table alias not supported: {table_alias:?}"
1083                    )?
1084                };
1085                if let Some(priority) = priority {
1086                    plan_err!(
1087                        "Inserts with a `PRIORITY` clause not supported: {priority:?}"
1088                    )?
1089                };
1090                if insert_alias.is_some() {
1091                    plan_err!("Inserts with an alias not supported")?;
1092                }
1093                if !assignments.is_empty() {
1094                    plan_err!("Inserts with assignments not supported")?;
1095                }
1096                if settings.is_some() {
1097                    plan_err!("Inserts with settings not supported")?;
1098                }
1099                if format_clause.is_some() {
1100                    plan_err!("Inserts with format clause not supported")?;
1101                }
1102                if !optimizer_hints.is_empty() {
1103                    plan_err!("Optimizer hints not supported")?;
1104                }
1105                if output.is_some() {
1106                    plan_err!("Insert OUTPUT clause not supported")?;
1107                }
1108                if multi_table_insert_type.is_some()
1109                    || !multi_table_into_clauses.is_empty()
1110                    || !multi_table_when_clauses.is_empty()
1111                    || multi_table_else_clause.is_some()
1112                {
1113                    plan_err!("Multi-table INSERT not supported")?;
1114                }
1115                // optional keywords don't change behavior
1116                let _ = into;
1117                let _ = has_table_keyword;
1118                self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
1119            }
1120            Statement::Update(Update {
1121                table,
1122                assignments,
1123                from,
1124                selection,
1125                returning,
1126                or,
1127                limit,
1128                update_token: _,
1129                optimizer_hints,
1130                output,
1131                order_by,
1132            }) => {
1133                let from_clauses =
1134                    from.map(|update_table_from_kind| match update_table_from_kind {
1135                        UpdateTableFromKind::BeforeSet(from_clauses) => from_clauses,
1136                        UpdateTableFromKind::AfterSet(from_clauses) => from_clauses,
1137                    });
1138                // TODO: support multiple tables in UPDATE SET FROM
1139                if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
1140                    not_impl_err!(
1141                        "Multiple tables in UPDATE SET FROM not yet supported"
1142                    )?;
1143                }
1144                let update_from = from_clauses.and_then(|mut f| f.pop());
1145
1146                // UPDATE ... FROM is currently not working
1147                // TODO fix https://github.com/apache/datafusion/issues/19950
1148                if update_from.is_some() {
1149                    return not_impl_err!("UPDATE ... FROM is not supported");
1150                }
1151
1152                if returning.is_some() {
1153                    plan_err!("Update-returning clause not yet supported")?;
1154                }
1155                if or.is_some() {
1156                    plan_err!("ON conflict not supported")?;
1157                }
1158                if limit.is_some() {
1159                    return not_impl_err!("Update-limit clause not supported")?;
1160                }
1161                if !optimizer_hints.is_empty() {
1162                    plan_err!("Optimizer hints not supported")?;
1163                }
1164                if output.is_some() {
1165                    plan_err!("Update OUTPUT clause not supported")?;
1166                }
1167                if !order_by.is_empty() {
1168                    plan_err!("Update ORDER BY not supported")?;
1169                }
1170                self.update_to_plan(table, &assignments, update_from, selection)
1171            }
1172
1173            Statement::Delete(Delete {
1174                tables,
1175                using,
1176                selection,
1177                returning,
1178                from,
1179                order_by,
1180                limit,
1181                delete_token: _,
1182                optimizer_hints,
1183                output,
1184            }) => {
1185                if !tables.is_empty() {
1186                    plan_err!("DELETE <TABLE> not supported")?;
1187                }
1188
1189                if using.is_some() {
1190                    plan_err!("Using clause not supported")?;
1191                }
1192
1193                if returning.is_some() {
1194                    plan_err!("Delete-returning clause not yet supported")?;
1195                }
1196
1197                if !order_by.is_empty() {
1198                    plan_err!("Delete-order-by clause not yet supported")?;
1199                }
1200
1201                if !optimizer_hints.is_empty() {
1202                    plan_err!("Optimizer hints not supported")?;
1203                }
1204                if output.is_some() {
1205                    plan_err!("Delete OUTPUT clause not supported")?;
1206                }
1207
1208                let table_name = self.get_delete_target(from)?;
1209                self.delete_to_plan(&table_name, selection, limit)
1210            }
1211
1212            Statement::StartTransaction {
1213                modes,
1214                begin: false,
1215                modifier,
1216                transaction,
1217                statements,
1218                has_end_keyword,
1219                exception,
1220            } => {
1221                if let Some(modifier) = modifier {
1222                    return not_impl_err!(
1223                        "Transaction modifier not supported: {modifier}"
1224                    );
1225                }
1226                if !statements.is_empty() {
1227                    return not_impl_err!(
1228                        "Transaction with multiple statements not supported"
1229                    );
1230                }
1231                if exception.is_some() {
1232                    return not_impl_err!(
1233                        "Transaction with exception statements not supported"
1234                    );
1235                }
1236                if has_end_keyword {
1237                    return not_impl_err!("Transaction with END keyword not supported");
1238                }
1239                self.validate_transaction_kind(transaction.as_ref())?;
1240                let isolation_level: ast::TransactionIsolationLevel = modes
1241                    .iter()
1242                    .filter_map(|m: &TransactionMode| match m {
1243                        TransactionMode::AccessMode(_) => None,
1244                        TransactionMode::IsolationLevel(level) => Some(level),
1245                    })
1246                    .next_back()
1247                    .copied()
1248                    .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1249                let access_mode: ast::TransactionAccessMode = modes
1250                    .iter()
1251                    .filter_map(|m: &TransactionMode| match m {
1252                        TransactionMode::AccessMode(mode) => Some(mode),
1253                        TransactionMode::IsolationLevel(_) => None,
1254                    })
1255                    .next_back()
1256                    .copied()
1257                    .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1258                let isolation_level = match isolation_level {
1259                    ast::TransactionIsolationLevel::ReadUncommitted => {
1260                        TransactionIsolationLevel::ReadUncommitted
1261                    }
1262                    ast::TransactionIsolationLevel::ReadCommitted => {
1263                        TransactionIsolationLevel::ReadCommitted
1264                    }
1265                    ast::TransactionIsolationLevel::RepeatableRead => {
1266                        TransactionIsolationLevel::RepeatableRead
1267                    }
1268                    ast::TransactionIsolationLevel::Serializable => {
1269                        TransactionIsolationLevel::Serializable
1270                    }
1271                    ast::TransactionIsolationLevel::Snapshot => {
1272                        TransactionIsolationLevel::Snapshot
1273                    }
1274                };
1275                let access_mode = match access_mode {
1276                    ast::TransactionAccessMode::ReadOnly => {
1277                        TransactionAccessMode::ReadOnly
1278                    }
1279                    ast::TransactionAccessMode::ReadWrite => {
1280                        TransactionAccessMode::ReadWrite
1281                    }
1282                };
1283                let statement = PlanStatement::TransactionStart(TransactionStart {
1284                    access_mode,
1285                    isolation_level,
1286                });
1287                Ok(LogicalPlan::Statement(statement))
1288            }
1289            Statement::Commit {
1290                chain,
1291                end,
1292                modifier,
1293            } => {
1294                if end {
1295                    return not_impl_err!("COMMIT AND END not supported");
1296                };
1297                if let Some(modifier) = modifier {
1298                    return not_impl_err!("COMMIT {modifier} not supported");
1299                };
1300                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1301                    conclusion: TransactionConclusion::Commit,
1302                    chain,
1303                });
1304                Ok(LogicalPlan::Statement(statement))
1305            }
1306            Statement::Rollback { chain, savepoint } => {
1307                if savepoint.is_some() {
1308                    plan_err!("Savepoints not supported")?;
1309                }
1310                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1311                    conclusion: TransactionConclusion::Rollback,
1312                    chain,
1313                });
1314                Ok(LogicalPlan::Statement(statement))
1315            }
1316            Statement::CreateFunction(ast::CreateFunction {
1317                or_replace,
1318                temporary,
1319                name,
1320                args,
1321                return_type,
1322                function_body,
1323                behavior,
1324                language,
1325                ..
1326            }) => {
1327                let return_type = match return_type {
1328                    Some(ast::FunctionReturnType::DataType(t)) => {
1329                        Some(self.convert_data_type_to_field(&t)?)
1330                    }
1331                    Some(ast::FunctionReturnType::SetOf(_)) => {
1332                        return not_impl_err!(
1333                            "RETURNS SETOF in CREATE FUNCTION is not supported"
1334                        );
1335                    }
1336                    None => None,
1337                };
1338                let mut planner_context = PlannerContext::new();
1339                let empty_schema = &DFSchema::empty();
1340
1341                let args = match args {
1342                    Some(function_args) => {
1343                        let function_args = function_args
1344                            .into_iter()
1345                            .map(|arg| {
1346                                let data_type =
1347                                    self.convert_data_type_to_field(&arg.data_type)?;
1348
1349                                let default_expr = match arg.default_expr {
1350                                    Some(expr) => Some(self.sql_to_expr(
1351                                        expr,
1352                                        empty_schema,
1353                                        &mut planner_context,
1354                                    )?),
1355                                    None => None,
1356                                };
1357                                Ok(OperateFunctionArg {
1358                                    name: arg.name,
1359                                    default_expr,
1360                                    data_type: data_type.data_type().clone(),
1361                                })
1362                            })
1363                            .collect::<Result<Vec<OperateFunctionArg>>>();
1364                        Some(function_args?)
1365                    }
1366                    None => None,
1367                };
1368                // Validate default arguments
1369                let first_default = match args.as_ref() {
1370                    Some(arg) => arg.iter().position(|t| t.default_expr.is_some()),
1371                    None => None,
1372                };
1373                let last_non_default = match args.as_ref() {
1374                    Some(arg) => arg
1375                        .iter()
1376                        .rev()
1377                        .position(|t| t.default_expr.is_none())
1378                        .map(|reverse_pos| arg.len() - reverse_pos - 1),
1379                    None => None,
1380                };
1381                if let (Some(pos_default), Some(pos_non_default)) =
1382                    (first_default, last_non_default)
1383                    && pos_non_default > pos_default
1384                {
1385                    return plan_err!(
1386                        "Non-default arguments cannot follow default arguments."
1387                    );
1388                }
1389                // At the moment functions can't be qualified `schema.name`
1390                let name = match &name.0[..] {
1391                    [] => exec_err!("Function should have name")?,
1392                    [n] => n.as_ident().unwrap().value.clone(),
1393                    [..] => not_impl_err!("Qualified functions are not supported")?,
1394                };
1395                //
1396                // Convert resulting expression to data fusion expression
1397                //
1398                let arg_types = args.as_ref().map(|arg| {
1399                    arg.iter()
1400                        .map(|t| {
1401                            let name = match t.name.clone() {
1402                                Some(name) => name.value,
1403                                None => "".to_string(),
1404                            };
1405                            Arc::new(Field::new(name, t.data_type.clone(), true))
1406                        })
1407                        .collect::<Vec<_>>()
1408                });
1409                // Validate parameter style
1410                if let Some(ref fields) = arg_types {
1411                    let count_positional =
1412                        fields.iter().filter(|f| f.name() == "").count();
1413                    if !(count_positional == 0 || count_positional == fields.len()) {
1414                        return plan_err!(
1415                            "All function arguments must use either named or positional style."
1416                        );
1417                    }
1418                }
1419                let mut planner_context = PlannerContext::new()
1420                    .with_prepare_param_data_types(
1421                        arg_types
1422                            .unwrap_or_default()
1423                            .into_iter()
1424                            .map(Some)
1425                            .collect(),
1426                    );
1427
1428                let function_body = match function_body {
1429                    Some(r) => Some(self.sql_to_expr(
1430                        match r {
1431                            // `link_symbol` indicates if the primary expression contains the name of shared library file.
1432                            ast::CreateFunctionBody::AsBeforeOptions{body: expr, link_symbol: _link_symbol} => expr,
1433                            ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1434                            ast::CreateFunctionBody::Return(expr) => expr,
1435                            ast::CreateFunctionBody::AsBeginEnd(_) => {
1436                                return not_impl_err!(
1437                                    "BEGIN/END enclosed function body syntax is not supported"
1438                                )?;
1439                            }
1440                            ast::CreateFunctionBody::AsReturnExpr(_)
1441                            | ast::CreateFunctionBody::AsReturnSelect(_) => {
1442                                return not_impl_err!(
1443                                    "AS RETURN function syntax is not supported"
1444                                )?
1445                            }
1446                        },
1447                        &DFSchema::empty(),
1448                        &mut planner_context,
1449                    )?),
1450                    None => None,
1451                };
1452
1453                let params = CreateFunctionBody {
1454                    language,
1455                    behavior: behavior.map(|b| match b {
1456                        ast::FunctionBehavior::Immutable => Volatility::Immutable,
1457                        ast::FunctionBehavior::Stable => Volatility::Stable,
1458                        ast::FunctionBehavior::Volatile => Volatility::Volatile,
1459                    }),
1460                    function_body,
1461                };
1462
1463                let statement = DdlStatement::CreateFunction(CreateFunction {
1464                    or_replace,
1465                    temporary,
1466                    name,
1467                    return_type: return_type.map(|f| f.data_type().clone()),
1468                    args,
1469                    params,
1470                    schema: DFSchemaRef::new(DFSchema::empty()),
1471                });
1472
1473                Ok(LogicalPlan::Ddl(statement))
1474            }
1475            Statement::DropFunction(ast::DropFunction {
1476                if_exists,
1477                func_desc,
1478                drop_behavior: _,
1479            }) => {
1480                // According to postgresql documentation it can be only one function
1481                // specified in drop statement
1482                if let Some(desc) = func_desc.first() {
1483                    // At the moment functions can't be qualified `schema.name`
1484                    let name = match &desc.name.0[..] {
1485                        [] => exec_err!("Function should have name")?,
1486                        [n] => n.as_ident().unwrap().value.clone(),
1487                        [..] => not_impl_err!("Qualified functions are not supported")?,
1488                    };
1489                    let statement = DdlStatement::DropFunction(DropFunction {
1490                        if_exists,
1491                        name,
1492                        schema: DFSchemaRef::new(DFSchema::empty()),
1493                    });
1494                    Ok(LogicalPlan::Ddl(statement))
1495                } else {
1496                    exec_err!("Function name not provided")
1497                }
1498            }
1499            Statement::Truncate(ast::Truncate {
1500                table_names,
1501                partitions,
1502                identity,
1503                cascade,
1504                on_cluster,
1505                table,
1506                if_exists,
1507            }) => {
1508                let _ = table; // Support TRUNCATE TABLE and TRUNCATE syntax
1509                if table_names.len() != 1 {
1510                    return not_impl_err!(
1511                        "TRUNCATE with multiple tables is not supported"
1512                    );
1513                }
1514
1515                let target = &table_names[0];
1516                if target.only {
1517                    return not_impl_err!("TRUNCATE with ONLY is not supported");
1518                }
1519                if partitions.is_some() {
1520                    return not_impl_err!("TRUNCATE with PARTITION is not supported");
1521                }
1522                if identity.is_some() {
1523                    return not_impl_err!(
1524                        "TRUNCATE with RESTART/CONTINUE IDENTITY is not supported"
1525                    );
1526                }
1527                if cascade.is_some() {
1528                    return not_impl_err!(
1529                        "TRUNCATE with CASCADE/RESTRICT is not supported"
1530                    );
1531                }
1532                if on_cluster.is_some() {
1533                    return not_impl_err!("TRUNCATE with ON CLUSTER is not supported");
1534                }
1535                if if_exists {
1536                    return not_impl_err!("TRUNCATE .. with IF EXISTS is not supported");
1537                }
1538                let table = self.object_name_to_table_reference(target.name.clone())?;
1539                let source = self.context_provider.get_table_source(table.clone())?;
1540
1541                // TRUNCATE does not operate on input rows. The EmptyRelation is a logical placeholder
1542                // since the real operation is executed directly by the TableProvider's truncate() hook.
1543                Ok(LogicalPlan::Dml(DmlStatement::new(
1544                    table.clone(),
1545                    source,
1546                    WriteOp::Truncate,
1547                    Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
1548                        produce_one_row: false,
1549                        schema: DFSchemaRef::new(DFSchema::empty()),
1550                    })),
1551                )))
1552            }
1553            Statement::CreateIndex(CreateIndex {
1554                name,
1555                table_name,
1556                using,
1557                columns,
1558                unique,
1559                if_not_exists,
1560                ..
1561            }) => {
1562                let name: Option<String> = name.as_ref().map(object_name_to_string);
1563                let table = self.object_name_to_table_reference(table_name)?;
1564                let table_schema = self
1565                    .context_provider
1566                    .get_table_source(table.clone())?
1567                    .schema()
1568                    .to_dfschema_ref()?;
1569                let using: Option<String> =
1570                    using.as_ref().map(|index_type| match index_type {
1571                        IndexType::Custom(ident) => ident_to_string(ident),
1572                        _ => index_type.to_string().to_ascii_lowercase(),
1573                    });
1574                let order_by_exprs: Vec<OrderByExpr> =
1575                    columns.into_iter().map(|col| col.column).collect();
1576                let columns = self.order_by_to_sort_expr(
1577                    order_by_exprs,
1578                    &table_schema,
1579                    planner_context,
1580                    false,
1581                    None,
1582                )?;
1583                Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1584                    PlanCreateIndex {
1585                        name,
1586                        table,
1587                        using,
1588                        columns,
1589                        unique,
1590                        if_not_exists,
1591                        schema: DFSchemaRef::new(DFSchema::empty()),
1592                    },
1593                )))
1594            }
1595            stmt => {
1596                not_impl_err!("Unsupported SQL statement: {stmt}")
1597            }
1598        }
1599    }
1600
1601    fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1602        let mut from = match from {
1603            FromTable::WithFromKeyword(v) => v,
1604            FromTable::WithoutKeyword(v) => v,
1605        };
1606
1607        if from.len() != 1 {
1608            return not_impl_err!(
1609                "DELETE FROM only supports single table, got {}: {from:?}",
1610                from.len()
1611            );
1612        }
1613        let table_factor = from.pop().unwrap();
1614        if !table_factor.joins.is_empty() {
1615            return not_impl_err!("DELETE FROM only supports single table, got: joins");
1616        }
1617        let TableFactor::Table { name, .. } = table_factor.relation else {
1618            return not_impl_err!(
1619                "DELETE FROM only supports single table, got: {table_factor:?}"
1620            );
1621        };
1622
1623        Ok(name)
1624    }
1625
1626    /// Generate a logical plan from a "SHOW TABLES" query
1627    fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1628        if self.has_table("information_schema", "tables") {
1629            let query = "SELECT * FROM information_schema.tables;";
1630            let mut rewrite = DFParser::parse_sql(query)?;
1631            assert_eq!(rewrite.len(), 1);
1632            self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
1633        } else {
1634            plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1635        }
1636    }
1637
1638    fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1639        let table_ref = self.object_name_to_table_reference(table_name)?;
1640
1641        let table_source = self.context_provider.get_table_source(table_ref)?;
1642
1643        let schema = table_source.schema();
1644
1645        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1646
1647        Ok(LogicalPlan::DescribeTable(DescribeTable {
1648            schema,
1649            output_schema: Arc::new(output_schema),
1650        }))
1651    }
1652
1653    fn describe_query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
1654        let plan = self.query_to_plan(query, &mut PlannerContext::new())?;
1655
1656        let schema = Arc::new(plan.schema().as_arrow().clone());
1657
1658        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1659
1660        Ok(LogicalPlan::DescribeTable(DescribeTable {
1661            schema,
1662            output_schema: Arc::new(output_schema),
1663        }))
1664    }
1665
1666    fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1667        // Determine if source is table or query and handle accordingly
1668        let copy_source = statement.source;
1669        let (input, input_schema, table_ref) = match copy_source {
1670            CopyToSource::Relation(object_name) => {
1671                let table_name = object_name_to_string(&object_name);
1672                let table_ref = self.object_name_to_table_reference(object_name)?;
1673                let table_source =
1674                    self.context_provider.get_table_source(table_ref.clone())?;
1675                let plan =
1676                    LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1677                let input_schema = Arc::clone(plan.schema());
1678                (plan, input_schema, Some(table_ref))
1679            }
1680            CopyToSource::Query(query) => {
1681                let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1682                let input_schema = Arc::clone(plan.schema());
1683                (plan, input_schema, None)
1684            }
1685        };
1686
1687        let options_map = self.parse_options_map(statement.options, true)?;
1688
1689        let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1690            self.context_provider.get_file_type(stored_as).ok()
1691        } else {
1692            None
1693        };
1694
1695        let file_type = match maybe_file_type {
1696            Some(ft) => ft,
1697            None => {
1698                let e = || {
1699                    DataFusionError::Configuration(
1700                        "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1701                            .to_string(),
1702                    )
1703                };
1704                // Try to infer file format from file extension
1705                let extension: &str = &Path::new(&statement.target)
1706                    .extension()
1707                    .ok_or_else(e)?
1708                    .to_str()
1709                    .ok_or_else(e)?
1710                    .to_lowercase();
1711
1712                self.context_provider.get_file_type(extension)?
1713            }
1714        };
1715
1716        let partition_by = statement
1717            .partitioned_by
1718            .iter()
1719            .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1720            .collect::<Result<Vec<_>>>()?
1721            .into_iter()
1722            .map(|f| f.name().to_owned())
1723            .collect();
1724
1725        Ok(LogicalPlan::Copy(CopyTo::new(
1726            Arc::new(input),
1727            statement.target,
1728            partition_by,
1729            file_type,
1730            options_map,
1731        )))
1732    }
1733
1734    fn build_order_by(
1735        &self,
1736        order_exprs: Vec<LexOrdering>,
1737        schema: &DFSchemaRef,
1738        planner_context: &mut PlannerContext,
1739    ) -> Result<Vec<Vec<SortExpr>>> {
1740        if !order_exprs.is_empty() && schema.fields().is_empty() {
1741            let results = order_exprs
1742                .iter()
1743                .map(|lex_order| {
1744                    let result = lex_order
1745                        .iter()
1746                        .map(|order_by_expr| {
1747                            let ordered_expr = &order_by_expr.expr;
1748                            let ordered_expr = ordered_expr.to_owned();
1749                            let ordered_expr = self.sql_expr_to_logical_expr(
1750                                ordered_expr,
1751                                schema,
1752                                planner_context,
1753                            )?;
1754                            let asc = order_by_expr.options.asc.unwrap_or(true);
1755                            let nulls_first =
1756                                order_by_expr.options.nulls_first.unwrap_or_else(|| {
1757                                    self.options.default_null_ordering.nulls_first(asc)
1758                                });
1759
1760                            Ok(SortExpr::new(ordered_expr, asc, nulls_first))
1761                        })
1762                        .collect::<Result<Vec<SortExpr>>>()?;
1763                    Ok(result)
1764                })
1765                .collect::<Result<Vec<Vec<SortExpr>>>>()?;
1766
1767            return Ok(results);
1768        }
1769
1770        let mut all_results = vec![];
1771        for expr in order_exprs {
1772            // Convert each OrderByExpr to a SortExpr:
1773            let expr_vec =
1774                self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1775            // Verify that columns of all SortExprs exist in the schema:
1776            for sort in expr_vec.iter() {
1777                for column in sort.expr.column_refs().iter() {
1778                    if !schema.has_column(column) {
1779                        // Return an error if any column is not in the schema:
1780                        return plan_err!("Column {column} is not in schema");
1781                    }
1782                }
1783            }
1784            // If all SortExprs are valid, return them as an expression vector
1785            all_results.push(expr_vec)
1786        }
1787        Ok(all_results)
1788    }
1789
1790    /// Generate a logical plan from a CREATE EXTERNAL TABLE statement
1791    fn external_table_to_plan(
1792        &self,
1793        statement: CreateExternalTable,
1794    ) -> Result<LogicalPlan> {
1795        let definition = Some(statement.to_string());
1796        let CreateExternalTable {
1797            name,
1798            columns,
1799            file_type,
1800            location,
1801            table_partition_cols,
1802            if_not_exists,
1803            temporary,
1804            order_exprs,
1805            unbounded,
1806            options,
1807            constraints,
1808            or_replace,
1809        } = statement;
1810
1811        // Merge inline constraints and existing constraints
1812        let mut all_constraints = constraints;
1813        let inline_constraints = calc_inline_constraints_from_columns(&columns);
1814        all_constraints.extend(inline_constraints);
1815
1816        let options_map = self.parse_options_map(options, false)?;
1817
1818        let compression = options_map
1819            .get("format.compression")
1820            .map(|c| CompressionTypeVariant::from_str(c))
1821            .transpose()?;
1822        if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1823            && compression
1824                .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1825                .unwrap_or(false)
1826        {
1827            plan_err!(
1828                "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1829            )?;
1830        }
1831
1832        let mut planner_context = PlannerContext::new();
1833
1834        let column_defaults = self
1835            .build_column_defaults(&columns, &mut planner_context)?
1836            .into_iter()
1837            .collect();
1838
1839        let schema = self.build_schema(columns)?;
1840        let df_schema = schema.to_dfschema_ref()?;
1841        df_schema.check_names()?;
1842
1843        let ordered_exprs =
1844            self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1845
1846        let name = self.object_name_to_table_reference(name)?;
1847        let constraints =
1848            self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1849        Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1850            PlanCreateExternalTable::builder(name, location, file_type, df_schema)
1851                .with_partition_cols(table_partition_cols)
1852                .with_if_not_exists(if_not_exists)
1853                .with_or_replace(or_replace)
1854                .with_temporary(temporary)
1855                .with_definition(definition)
1856                .with_order_exprs(ordered_exprs)
1857                .with_unbounded(unbounded)
1858                .with_options(options_map)
1859                .with_constraints(constraints)
1860                .with_column_defaults(column_defaults)
1861                .build(),
1862        )))
1863    }
1864
1865    /// Get the indices of the constraint columns in the schema.
1866    /// If any column is not found, return an error.
1867    fn get_constraint_column_indices(
1868        &self,
1869        df_schema: &DFSchemaRef,
1870        columns: &[IndexColumn],
1871        constraint_name: &str,
1872    ) -> Result<Vec<usize>> {
1873        let field_names = df_schema.field_names();
1874        columns
1875            .iter()
1876            .map(|index_column| {
1877                let expr = &index_column.column.expr;
1878                let ident = if let SQLExpr::Identifier(ident) = expr {
1879                    ident
1880                } else {
1881                    return Err(plan_datafusion_err!(
1882                        "Column name for {constraint_name} must be an identifier: {expr}"
1883                    ));
1884                };
1885                let column = self.ident_normalizer.normalize(ident.clone());
1886                field_names
1887                    .iter()
1888                    .position(|item| *item == column)
1889                    .ok_or_else(|| {
1890                        plan_datafusion_err!(
1891                            "Column for {constraint_name} not found in schema: {column}"
1892                        )
1893                    })
1894            })
1895            .collect::<Result<Vec<_>>>()
1896    }
1897
1898    /// Convert each [TableConstraint] to corresponding [Constraint]
1899    pub fn new_constraint_from_table_constraints(
1900        &self,
1901        constraints: &[TableConstraint],
1902        df_schema: &DFSchemaRef,
1903    ) -> Result<Constraints> {
1904        let constraints = constraints
1905            .iter()
1906            .map(|c: &TableConstraint| match c {
1907                TableConstraint::Unique(UniqueConstraint {
1908                    name,
1909                    index_name: _,
1910                    index_type_display: _,
1911                    index_type: _,
1912                    columns,
1913                    index_options: _,
1914                    characteristics: _,
1915                    nulls_distinct: _,
1916                }) => {
1917                    let constraint_name = match &name {
1918                        Some(name) => &format!("unique constraint with name '{name}'"),
1919                        None => "unique constraint",
1920                    };
1921                    // Get unique constraint indices in the schema
1922                    let indices = self.get_constraint_column_indices(
1923                        df_schema,
1924                        columns,
1925                        constraint_name,
1926                    )?;
1927                    Ok(Constraint::Unique(indices))
1928                }
1929                TableConstraint::PrimaryKey(PrimaryKeyConstraint {
1930                    name: _,
1931                    index_name: _,
1932                    index_type: _,
1933                    columns,
1934                    index_options: _,
1935                    characteristics: _,
1936                }) => {
1937                    // Get primary key indices in the schema
1938                    let indices = self.get_constraint_column_indices(
1939                        df_schema,
1940                        columns,
1941                        "primary key",
1942                    )?;
1943                    Ok(Constraint::PrimaryKey(indices))
1944                }
1945                TableConstraint::ForeignKey { .. } => {
1946                    _plan_err!("Foreign key constraints are not currently supported")
1947                }
1948                TableConstraint::Check { .. } => {
1949                    _plan_err!("Check constraints are not currently supported")
1950                }
1951                TableConstraint::Index { .. } => {
1952                    _plan_err!("Indexes are not currently supported")
1953                }
1954                TableConstraint::FulltextOrSpatial { .. } => {
1955                    _plan_err!("Indexes are not currently supported")
1956                }
1957                TableConstraint::PrimaryKeyUsingIndex(_) => {
1958                    _plan_err!(
1959                        "PRIMARY KEY USING INDEX constraints are not currently supported"
1960                    )
1961                }
1962                TableConstraint::UniqueUsingIndex(_) => {
1963                    _plan_err!(
1964                        "UNIQUE USING INDEX constraints are not currently supported"
1965                    )
1966                }
1967            })
1968            .collect::<Result<Vec<_>>>()?;
1969        Ok(Constraints::new_unverified(constraints))
1970    }
1971
1972    fn parse_options_map(
1973        &self,
1974        options: Vec<(String, Value)>,
1975        allow_duplicates: bool,
1976    ) -> Result<HashMap<String, String>> {
1977        let mut options_map = HashMap::new();
1978        for (key, value) in options {
1979            if !allow_duplicates && options_map.contains_key(&key) {
1980                return plan_err!("Option {key} is specified multiple times");
1981            }
1982
1983            let Some(value_string) = crate::utils::value_to_string(&value) else {
1984                return plan_err!("Unsupported Value {}", value);
1985            };
1986
1987            if !(&key.contains('.')) {
1988                // If config does not belong to any namespace, assume it is
1989                // a format option and apply the format prefix for backwards
1990                // compatibility.
1991                let renamed_key = format!("format.{key}");
1992                options_map.insert(renamed_key.to_lowercase(), value_string);
1993            } else {
1994                options_map.insert(key.to_lowercase(), value_string);
1995            }
1996        }
1997
1998        Ok(options_map)
1999    }
2000
2001    /// Generate a plan for EXPLAIN ... that will print out a plan
2002    ///
2003    /// Note this is the sqlparser explain statement, not the
2004    /// datafusion `EXPLAIN` statement.
2005    fn explain_to_plan(
2006        &self,
2007        verbose: bool,
2008        analyze: bool,
2009        format: Option<String>,
2010        statement: DFStatement,
2011    ) -> Result<LogicalPlan> {
2012        let plan = self.statement_to_plan(statement)?;
2013        if matches!(plan, LogicalPlan::Explain(_)) {
2014            return plan_err!("Nested EXPLAINs are not supported");
2015        }
2016
2017        let plan = Arc::new(plan);
2018        let schema = LogicalPlan::explain_schema();
2019        let schema = schema.to_dfschema_ref()?;
2020
2021        if verbose && format.is_some() {
2022            return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
2023        }
2024
2025        if analyze {
2026            if format.is_some() {
2027                return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
2028            }
2029            Ok(LogicalPlan::Analyze(Analyze {
2030                verbose,
2031                input: plan,
2032                schema,
2033            }))
2034        } else {
2035            let stringified_plans =
2036                vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
2037
2038            // default to configuration value
2039            // verbose mode only supports indent format
2040            let options = self.context_provider.options();
2041            let format = if verbose {
2042                ExplainFormat::Indent
2043            } else if let Some(format) = format {
2044                ExplainFormat::from_str(&format)?
2045            } else {
2046                options.explain.format.clone()
2047            };
2048
2049            Ok(LogicalPlan::Explain(Explain {
2050                verbose,
2051                explain_format: format,
2052                plan,
2053                stringified_plans,
2054                schema,
2055                logical_optimization_succeeded: false,
2056            }))
2057        }
2058    }
2059
2060    fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
2061        if !self.has_table("information_schema", "df_settings") {
2062            return plan_err!(
2063                "SHOW [VARIABLE] is not supported unless information_schema is enabled"
2064            );
2065        }
2066
2067        let verbose = variable
2068            .last()
2069            .map(|s| ident_to_string(s) == "verbose")
2070            .unwrap_or(false);
2071        let mut variable_vec = variable.to_vec();
2072        let mut columns: String = "name, value".to_owned();
2073
2074        if verbose {
2075            columns = format!("{columns}, description");
2076            variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
2077        }
2078
2079        let variable = object_name_to_string(&ObjectName::from(variable_vec));
2080        let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
2081        let query = if variable == "all" {
2082            // Add an ORDER BY so the output comes out in a consistent order
2083            format!("{base_query} ORDER BY name")
2084        } else if variable == "timezone" || variable == "time.zone" {
2085            // we could introduce alias in OptionDefinition if this string matching thing grows
2086            format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
2087        } else {
2088            // These values are what are used to make the information_schema table, so we just
2089            // check here, before actually planning or executing the query, if it would produce no
2090            // results, and error preemptively if it would (for a better UX)
2091            let is_valid_variable = self
2092                .context_provider
2093                .options()
2094                .entries()
2095                .iter()
2096                .any(|opt| opt.key == variable);
2097
2098            // Check if it's a runtime variable
2099            let is_runtime_variable = variable.starts_with("datafusion.runtime.");
2100
2101            if !is_valid_variable && !is_runtime_variable {
2102                return plan_err!(
2103                    "'{variable}' is not a variable which can be viewed with 'SHOW'"
2104                );
2105            }
2106
2107            format!("{base_query} WHERE name = '{variable}'")
2108        };
2109
2110        let mut rewrite = DFParser::parse_sql(&query)?;
2111        assert_eq!(rewrite.len(), 1);
2112
2113        self.statement_to_plan(rewrite.pop_front().unwrap())
2114    }
2115
2116    fn set_statement_to_plan(&self, statement: Set) -> Result<LogicalPlan> {
2117        match statement {
2118            Set::SingleAssignment {
2119                scope,
2120                hivevar,
2121                variable,
2122                values,
2123            } => {
2124                if scope.is_some() {
2125                    return not_impl_err!("SET with scope modifiers is not supported");
2126                }
2127
2128                if hivevar {
2129                    return not_impl_err!("SET HIVEVAR is not supported");
2130                }
2131
2132                let variable = object_name_to_string(&variable);
2133                let mut variable_lower = variable.to_lowercase();
2134
2135                // Map PostgreSQL "timezone" and MySQL "time.zone" aliases to DataFusion's canonical name
2136                if variable_lower == "timezone" || variable_lower == "time.zone" {
2137                    variable_lower = "datafusion.execution.time_zone".to_string();
2138                }
2139
2140                if values.len() != 1 {
2141                    return plan_err!("SET only supports single value assignment");
2142                }
2143
2144                let value_string = match &values[0] {
2145                    SQLExpr::Identifier(i) => ident_to_string(i),
2146                    SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
2147                        None => {
2148                            return plan_err!("Unsupported value {:?}", v.value);
2149                        }
2150                        Some(s) => s,
2151                    },
2152                    SQLExpr::UnaryOp { op, expr } => match op {
2153                        UnaryOperator::Plus => format!("+{expr}"),
2154                        UnaryOperator::Minus => format!("-{expr}"),
2155                        _ => return plan_err!("Unsupported unary op {:?}", op),
2156                    },
2157                    _ => return plan_err!("Unsupported expr {:?}", values[0]),
2158                };
2159
2160                Ok(LogicalPlan::Statement(PlanStatement::SetVariable(
2161                    SetVariable {
2162                        variable: variable_lower,
2163                        value: value_string,
2164                    },
2165                )))
2166            }
2167            other => not_impl_err!("SET variant not implemented yet: {other:?}"),
2168        }
2169    }
2170
2171    fn reset_statement_to_plan(&self, statement: ResetStatement) -> Result<LogicalPlan> {
2172        match statement {
2173            ResetStatement::Variable(variable) => {
2174                let variable = object_name_to_string(&variable);
2175                let mut variable_lower = variable.to_lowercase();
2176
2177                // Map PostgreSQL "timezone" and MySQL "time.zone" aliases to DataFusion's canonical name
2178                if variable_lower == "timezone" || variable_lower == "time.zone" {
2179                    variable_lower = "datafusion.execution.time_zone".to_string();
2180                }
2181
2182                Ok(LogicalPlan::Statement(PlanStatement::ResetVariable(
2183                    ResetVariable {
2184                        variable: variable_lower,
2185                    },
2186                )))
2187            }
2188        }
2189    }
2190
2191    fn delete_to_plan(
2192        &self,
2193        table_name: &ObjectName,
2194        predicate_expr: Option<SQLExpr>,
2195        limit: Option<SQLExpr>,
2196    ) -> Result<LogicalPlan> {
2197        // Do a table lookup to verify the table exists
2198        let table_ref = self.object_name_to_table_reference(table_name.clone())?;
2199        let table_source = self.context_provider.get_table_source(table_ref.clone())?;
2200        let schema = DFSchema::try_from_qualified_schema(
2201            table_ref.clone(),
2202            &table_source.schema(),
2203        )?;
2204        let scan =
2205            LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
2206                .build()?;
2207        let mut planner_context = PlannerContext::new();
2208
2209        let mut source = match predicate_expr {
2210            None => scan,
2211            Some(predicate_expr) => {
2212                let filter_expr =
2213                    self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
2214                let schema = Arc::new(schema);
2215                let mut using_columns = HashSet::new();
2216                expr_to_columns(&filter_expr, &mut using_columns)?;
2217                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2218                    filter_expr,
2219                    &[&[&schema]],
2220                    &[using_columns],
2221                )?;
2222                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2223            }
2224        };
2225
2226        if let Some(limit) = limit {
2227            let empty_schema = DFSchema::empty();
2228            let limit = self.sql_to_expr(limit, &empty_schema, &mut planner_context)?;
2229            source = LogicalPlanBuilder::from(source)
2230                .limit_by_expr(None, Some(limit))?
2231                .build()?
2232        }
2233
2234        let plan = LogicalPlan::Dml(DmlStatement::new(
2235            table_ref,
2236            table_source,
2237            WriteOp::Delete,
2238            Arc::new(source),
2239        ));
2240        Ok(plan)
2241    }
2242
2243    fn update_to_plan(
2244        &self,
2245        table: TableWithJoins,
2246        assignments: &[Assignment],
2247        from: Option<TableWithJoins>,
2248        predicate_expr: Option<SQLExpr>,
2249    ) -> Result<LogicalPlan> {
2250        let (table_name, table_alias) = match &table.relation {
2251            TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
2252            _ => plan_err!("Cannot update non-table relation!")?,
2253        };
2254
2255        // Do a table lookup to verify the table exists
2256        let table_name = self.object_name_to_table_reference(table_name)?;
2257        let table_source = self.context_provider.get_table_source(table_name.clone())?;
2258        let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
2259            table_name.clone(),
2260            &table_source.schema(),
2261        )?);
2262
2263        // Overwrite with assignment expressions
2264        let mut planner_context = PlannerContext::new();
2265        let mut assign_map = assignments
2266            .iter()
2267            .map(|assign| {
2268                let cols = match &assign.target {
2269                    AssignmentTarget::ColumnName(cols) => cols,
2270                    _ => plan_err!("Tuples are not supported")?,
2271                };
2272                let col_name: &Ident = cols
2273                    .0
2274                    .iter()
2275                    .last()
2276                    .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
2277                    .as_ident()
2278                    .unwrap();
2279                // Validate that the assignment target column exists
2280                table_schema.field_with_unqualified_name(&col_name.value)?;
2281                Ok((col_name.value.clone(), assign.value.clone()))
2282            })
2283            .collect::<Result<HashMap<String, SQLExpr>>>()?;
2284
2285        // Build scan, join with from table if it exists.
2286        let mut input_tables = vec![table];
2287        input_tables.extend(from);
2288        let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
2289
2290        // Filter
2291        let source = match predicate_expr {
2292            None => scan,
2293            Some(predicate_expr) => {
2294                let filter_expr = self.sql_to_expr(
2295                    predicate_expr,
2296                    scan.schema(),
2297                    &mut planner_context,
2298                )?;
2299                let mut using_columns = HashSet::new();
2300                expr_to_columns(&filter_expr, &mut using_columns)?;
2301                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2302                    filter_expr,
2303                    &[&[scan.schema()]],
2304                    &[using_columns],
2305                )?;
2306                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2307            }
2308        };
2309
2310        // Build updated values for each column, using the previous value if not modified
2311        let exprs = table_schema
2312            .iter()
2313            .map(|(qualifier, field)| {
2314                let expr = match assign_map.remove(field.name()) {
2315                    Some(new_value) => {
2316                        let mut expr = self.sql_to_expr(
2317                            new_value,
2318                            source.schema(),
2319                            &mut planner_context,
2320                        )?;
2321                        // Update placeholder's datatype to the type of the target column
2322                        if let Expr::Placeholder(placeholder) = &mut expr {
2323                            placeholder.field = placeholder
2324                                .field
2325                                .take()
2326                                .or_else(|| Some(Arc::clone(field)));
2327                        }
2328                        // Cast to target column type, if necessary
2329                        expr.cast_to(field.data_type(), source.schema())?
2330                    }
2331                    None => {
2332                        // If the target table has an alias, use it to qualify the column name
2333                        if let Some(alias) = &table_alias {
2334                            Expr::Column(Column::new(
2335                                Some(self.ident_normalizer.normalize(alias.name.clone())),
2336                                field.name(),
2337                            ))
2338                        } else {
2339                            Expr::Column(Column::from((qualifier, field)))
2340                        }
2341                    }
2342                };
2343                Ok(expr.alias(field.name()))
2344            })
2345            .collect::<Result<Vec<_>>>()?;
2346
2347        let source = project(source, exprs)?;
2348
2349        let plan = LogicalPlan::Dml(DmlStatement::new(
2350            table_name,
2351            table_source,
2352            WriteOp::Update,
2353            Arc::new(source),
2354        ));
2355        Ok(plan)
2356    }
2357
2358    fn insert_to_plan(
2359        &self,
2360        table_name: ObjectName,
2361        columns: Vec<ObjectName>,
2362        source: Box<Query>,
2363        overwrite: bool,
2364        replace_into: bool,
2365    ) -> Result<LogicalPlan> {
2366        // Do a table lookup to verify the table exists
2367        let table_name = self.object_name_to_table_reference(table_name)?;
2368        let table_source = self.context_provider.get_table_source(table_name.clone())?;
2369        let table_schema = DFSchema::try_from(table_source.schema())?;
2370
2371        let columns: Vec<Ident> = columns
2372            .into_iter()
2373            .map(|name| {
2374                if name.0.len() != 1 {
2375                    return not_impl_err!(
2376                        "Multi-part column names in INSERT not supported: {name}"
2377                    );
2378                }
2379                let part = &name.0[0];
2380                let Some(ident) = part.as_ident() else {
2381                    return not_impl_err!(
2382                        "Non-identifier column name part in INSERT not supported: {part}"
2383                    );
2384                };
2385                Ok(ident.clone())
2386            })
2387            .collect::<Result<Vec<_>>>()?;
2388
2389        // Get insert fields and target table's value indices
2390        //
2391        // If value_indices[i] = Some(j), it means that the value of the i-th target table's column is
2392        // derived from the j-th output of the source.
2393        //
2394        // If value_indices[i] = None, it means that the value of the i-th target table's column is
2395        // not provided, and should be filled with a default value later.
2396        let (fields, value_indices) = if columns.is_empty() {
2397            // Empty means we're inserting into all columns of the table
2398            (
2399                table_schema.fields().clone(),
2400                (0..table_schema.fields().len())
2401                    .map(Some)
2402                    .collect::<Vec<_>>(),
2403            )
2404        } else {
2405            let mut value_indices = vec![None; table_schema.fields().len()];
2406            let fields = columns
2407                .into_iter()
2408                .enumerate()
2409                .map(|(i, c)| {
2410                    let c = self.ident_normalizer.normalize(c);
2411                    let column_index = table_schema
2412                        .index_of_column_by_name(None, &c)
2413                        .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
2414
2415                    if value_indices[column_index].is_some() {
2416                        return schema_err!(SchemaError::DuplicateUnqualifiedField {
2417                            name: c,
2418                        });
2419                    } else {
2420                        value_indices[column_index] = Some(i);
2421                    }
2422                    Ok(Arc::clone(table_schema.field(column_index)))
2423                })
2424                .collect::<Result<Vec<_>>>()?;
2425            (Fields::from(fields), value_indices)
2426        };
2427
2428        // infer types for Values clause... other types should be resolvable the regular way
2429        let mut prepare_param_data_types = BTreeMap::new();
2430        if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2431            for row in rows.iter() {
2432                for (idx, val) in row.content.iter().enumerate() {
2433                    if let SQLExpr::Value(ValueWithSpan {
2434                        value: Value::Placeholder(name),
2435                        span: _,
2436                    }) = val
2437                    {
2438                        let name =
2439                            name.replace('$', "").parse::<usize>().map_err(|_| {
2440                                plan_datafusion_err!("Can't parse placeholder: {name}")
2441                            })? - 1;
2442                        let field = fields.get(idx).ok_or_else(|| {
2443                            plan_datafusion_err!(
2444                                "Placeholder ${} refers to a non existent column",
2445                                idx + 1
2446                            )
2447                        })?;
2448                        let _ = prepare_param_data_types.insert(name, Arc::clone(field));
2449                    }
2450                }
2451            }
2452        }
2453        let prepare_param_data_types = {
2454            let len = prepare_param_data_types.keys().last().map_or(0, |&k| k + 1);
2455            (0..len)
2456                .map(|i| prepare_param_data_types.remove(&i))
2457                .collect()
2458        };
2459
2460        // Projection
2461        let mut planner_context =
2462            PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2463        planner_context.set_table_schema(Some(DFSchemaRef::new(
2464            DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2465        )));
2466        let source = self.query_to_plan(*source, &mut planner_context)?;
2467        if fields.len() != source.schema().fields().len() {
2468            plan_err!("Column count doesn't match insert query!")?;
2469        }
2470
2471        let exprs = value_indices
2472            .into_iter()
2473            .enumerate()
2474            .map(|(i, value_index)| {
2475                let target_field = table_schema.field(i);
2476                let expr = match value_index {
2477                    Some(v) => {
2478                        Expr::Column(Column::from(source.schema().qualified_field(v)))
2479                            .cast_to(target_field.data_type(), source.schema())?
2480                    }
2481                    // The value is not specified. Fill in the default value for the column.
2482                    None => table_source
2483                        .get_column_default(target_field.name())
2484                        .cloned()
2485                        .unwrap_or_else(|| {
2486                            // If there is no default for the column, then the default is NULL
2487                            Expr::Literal(ScalarValue::Null, None)
2488                        })
2489                        .cast_to(target_field.data_type(), &DFSchema::empty())?,
2490                };
2491                Ok(expr.alias(target_field.name()))
2492            })
2493            .collect::<Result<Vec<Expr>>>()?;
2494        let source = project(source, exprs)?;
2495
2496        let insert_op = match (overwrite, replace_into) {
2497            (false, false) => InsertOp::Append,
2498            (true, false) => InsertOp::Overwrite,
2499            (false, true) => InsertOp::Replace,
2500            (true, true) => plan_err!(
2501                "Conflicting insert operations: `overwrite` and `replace_into` cannot both be true"
2502            )?,
2503        };
2504
2505        let plan = LogicalPlan::Dml(DmlStatement::new(
2506            table_name,
2507            Arc::clone(&table_source),
2508            WriteOp::Insert(insert_op),
2509            Arc::new(source),
2510        ));
2511        Ok(plan)
2512    }
2513
2514    fn show_columns_to_plan(
2515        &self,
2516        extended: bool,
2517        full: bool,
2518        sql_table_name: ObjectName,
2519    ) -> Result<LogicalPlan> {
2520        // Figure out the where clause
2521        let where_clause = object_name_to_qualifier(
2522            &sql_table_name,
2523            self.options.enable_ident_normalization,
2524        )?;
2525
2526        if !self.has_table("information_schema", "columns") {
2527            return plan_err!(
2528                "SHOW COLUMNS is not supported unless information_schema is enabled"
2529            );
2530        }
2531
2532        // Do a table lookup to verify the table exists
2533        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2534        let _ = self.context_provider.get_table_source(table_ref)?;
2535
2536        // Treat both FULL and EXTENDED as the same
2537        let select_list = if full || extended {
2538            "*"
2539        } else {
2540            "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2541        };
2542
2543        let query = format!(
2544            "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2545        );
2546
2547        let mut rewrite = DFParser::parse_sql(&query)?;
2548        assert_eq!(rewrite.len(), 1);
2549        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2550    }
2551
2552    /// Rewrite `SHOW FUNCTIONS` to another SQL query
2553    /// The query is based on the `information_schema.routines` and `information_schema.parameters` tables
2554    ///
2555    /// The output columns:
2556    /// - function_name: The name of function
2557    /// - return_type: The return type of the function
2558    /// - parameters: The name of parameters (ordered by the ordinal position)
2559    /// - parameter_types: The type of parameters (ordered by the ordinal position)
2560    /// - description: The description of the function (the description defined in the document)
2561    /// - syntax_example: The syntax_example of the function (the syntax_example defined in the document)
2562    fn show_functions_to_plan(
2563        &self,
2564        filter: Option<ShowStatementFilter>,
2565    ) -> Result<LogicalPlan> {
2566        let where_clause = if let Some(filter) = filter {
2567            match filter {
2568                ShowStatementFilter::Like(like) => {
2569                    format!("WHERE p.function_name like '{like}'")
2570                }
2571                _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2572            }
2573        } else {
2574            "".to_string()
2575        };
2576
2577        let query = format!(
2578            r#"
2579SELECT DISTINCT
2580    p.*,
2581    r.function_type function_type,
2582    r.description description,
2583    r.syntax_example syntax_example
2584FROM
2585    (
2586        SELECT
2587            i.specific_name function_name,
2588            o.data_type return_type,
2589            array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2590            array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2591        FROM (
2592                 SELECT
2593                     specific_catalog,
2594                     specific_schema,
2595                     specific_name,
2596                     ordinal_position,
2597                     parameter_name,
2598                     data_type,
2599                     rid
2600                 FROM
2601                     information_schema.parameters
2602                 WHERE
2603                     parameter_mode = 'IN'
2604             ) i
2605                 JOIN
2606             (
2607                 SELECT
2608                     specific_catalog,
2609                     specific_schema,
2610                     specific_name,
2611                     ordinal_position,
2612                     parameter_name,
2613                     data_type,
2614                     rid
2615                 FROM
2616                     information_schema.parameters
2617                 WHERE
2618                     parameter_mode = 'OUT'
2619             ) o
2620             ON i.specific_catalog = o.specific_catalog
2621                 AND i.specific_schema = o.specific_schema
2622                 AND i.specific_name = o.specific_name
2623                 AND i.rid = o.rid
2624        GROUP BY 1, 2, i.rid
2625    ) as p
2626JOIN information_schema.routines r
2627ON p.function_name = r.routine_name
2628{where_clause}
2629            "#
2630        );
2631        let mut rewrite = DFParser::parse_sql(&query)?;
2632        assert_eq!(rewrite.len(), 1);
2633        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2634    }
2635
2636    fn show_create_table_to_plan(
2637        &self,
2638        sql_table_name: ObjectName,
2639    ) -> Result<LogicalPlan> {
2640        if !self.has_table("information_schema", "tables") {
2641            return plan_err!(
2642                "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2643            );
2644        }
2645        // Figure out the where clause
2646        let where_clause = object_name_to_qualifier(
2647            &sql_table_name,
2648            self.options.enable_ident_normalization,
2649        )?;
2650
2651        // Do a table lookup to verify the table exists
2652        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2653        let _ = self.context_provider.get_table_source(table_ref)?;
2654
2655        let query = format!(
2656            "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2657        );
2658
2659        let mut rewrite = DFParser::parse_sql(&query)?;
2660        assert_eq!(rewrite.len(), 1);
2661        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2662    }
2663
2664    /// Return true if there is a table provider available for "schema.table"
2665    fn has_table(&self, schema: &str, table: &str) -> bool {
2666        let tables_reference = TableReference::Partial {
2667            schema: schema.into(),
2668            table: table.into(),
2669        };
2670        self.context_provider
2671            .get_table_source(tables_reference)
2672            .is_ok()
2673    }
2674
2675    fn validate_transaction_kind(
2676        &self,
2677        kind: Option<&BeginTransactionKind>,
2678    ) -> Result<()> {
2679        match kind {
2680            // BEGIN
2681            None => Ok(()),
2682            // BEGIN TRANSACTION
2683            Some(BeginTransactionKind::Transaction) => Ok(()),
2684            Some(BeginTransactionKind::Work) | Some(BeginTransactionKind::Tran) => {
2685                not_impl_err!("Transaction kind not supported: {kind:?}")
2686            }
2687        }
2688    }
2689}