datafusion_sql/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24    CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25    LexOrdering, ResetStatement, Statement as DFStatement,
26};
27use crate::planner::{
28    ContextProvider, PlannerContext, SqlToRel, object_name_to_qualifier,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{Field, FieldRef, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36    Column, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result,
37    ScalarValue, SchemaError, SchemaReference, TableReference, ToDFSchema, exec_err,
38    internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
39    unqualified_field_not_found,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::DdlStatement;
44use datafusion_expr::logical_plan::builder::project;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47    Analyze, CreateCatalog, CreateCatalogSchema,
48    CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49    CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50    DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51    EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52    LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare,
53    ResetVariable, SetVariable, SortExpr, Statement as PlanStatement, ToStringifiedPlan,
54    TransactionAccessMode, TransactionConclusion, TransactionEnd,
55    TransactionIsolationLevel, TransactionStart, Volatility, WriteOp, cast, col,
56};
57use sqlparser::ast::{
58    self, BeginTransactionKind, IndexColumn, IndexType, NullsDistinctOption, OrderByExpr,
59    OrderByOptions, Set, ShowStatementIn, ShowStatementOptions, SqliteOnConflict,
60    TableObject, UpdateTableFromKind, ValueWithSpan,
61};
62use sqlparser::ast::{
63    Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
64    CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
65    ObjectName, ObjectType, Query, SchemaName, SetExpr, ShowCreateObject,
66    ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
67    TransactionMode, UnaryOperator, Value,
68};
69use sqlparser::parser::ParserError::ParserError;
70
71fn ident_to_string(ident: &Ident) -> String {
72    normalize_ident(ident.to_owned())
73}
74
75fn object_name_to_string(object_name: &ObjectName) -> String {
76    object_name
77        .0
78        .iter()
79        .map(|object_name_part| {
80            object_name_part
81                .as_ident()
82                // TODO: It might be better to return an error
83                // than to silently use a default value.
84                .map_or_else(String::new, ident_to_string)
85        })
86        .collect::<Vec<String>>()
87        .join(".")
88}
89
90fn get_schema_name(schema_name: &SchemaName) -> String {
91    match schema_name {
92        SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
93        SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
94        SchemaName::NamedAuthorization(schema_name, auth) => format!(
95            "{}.{}",
96            object_name_to_string(schema_name),
97            ident_to_string(auth)
98        ),
99    }
100}
101
102/// Construct `TableConstraint`(s) for the given columns by iterating over
103/// `columns` and extracting individual inline constraint definitions.
104fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
105    let mut constraints = vec![];
106    for column in columns {
107        for ast::ColumnOptionDef { name, option } in &column.options {
108            match option {
109                ast::ColumnOption::Unique {
110                    is_primary: false,
111                    characteristics,
112                } => constraints.push(TableConstraint::Unique {
113                    name: name.clone(),
114                    columns: vec![IndexColumn {
115                        column: OrderByExpr {
116                            expr: SQLExpr::Identifier(column.name.clone()),
117                            options: OrderByOptions {
118                                asc: None,
119                                nulls_first: None,
120                            },
121                            with_fill: None,
122                        },
123                        operator_class: None,
124                    }],
125                    characteristics: *characteristics,
126                    index_name: None,
127                    index_type_display: ast::KeyOrIndexDisplay::None,
128                    index_type: None,
129                    index_options: vec![],
130                    nulls_distinct: NullsDistinctOption::None,
131                }),
132                ast::ColumnOption::Unique {
133                    is_primary: true,
134                    characteristics,
135                } => constraints.push(TableConstraint::PrimaryKey {
136                    name: name.clone(),
137                    columns: vec![IndexColumn {
138                        column: OrderByExpr {
139                            expr: SQLExpr::Identifier(column.name.clone()),
140                            options: OrderByOptions {
141                                asc: None,
142                                nulls_first: None,
143                            },
144                            with_fill: None,
145                        },
146                        operator_class: None,
147                    }],
148                    characteristics: *characteristics,
149                    index_name: None,
150                    index_type: None,
151                    index_options: vec![],
152                }),
153                ast::ColumnOption::ForeignKey {
154                    foreign_table,
155                    referred_columns,
156                    on_delete,
157                    on_update,
158                    characteristics,
159                } => constraints.push(TableConstraint::ForeignKey {
160                    name: name.clone(),
161                    columns: vec![],
162                    foreign_table: foreign_table.clone(),
163                    referred_columns: referred_columns.to_vec(),
164                    on_delete: *on_delete,
165                    on_update: *on_update,
166                    characteristics: *characteristics,
167                    index_name: None,
168                }),
169                ast::ColumnOption::Check(expr) => {
170                    constraints.push(TableConstraint::Check {
171                        name: name.clone(),
172                        expr: Box::new(expr.clone()),
173                        enforced: None,
174                    })
175                }
176                // Other options are not constraint related.
177                ast::ColumnOption::Default(_)
178                | ast::ColumnOption::Null
179                | ast::ColumnOption::NotNull
180                | ast::ColumnOption::DialectSpecific(_)
181                | ast::ColumnOption::CharacterSet(_)
182                | ast::ColumnOption::Generated { .. }
183                | ast::ColumnOption::Comment(_)
184                | ast::ColumnOption::Options(_)
185                | ast::ColumnOption::OnUpdate(_)
186                | ast::ColumnOption::Materialized(_)
187                | ast::ColumnOption::Ephemeral(_)
188                | ast::ColumnOption::Identity(_)
189                | ast::ColumnOption::OnConflict(_)
190                | ast::ColumnOption::Policy(_)
191                | ast::ColumnOption::Tags(_)
192                | ast::ColumnOption::Alias(_)
193                | ast::ColumnOption::Srid(_)
194                | ast::ColumnOption::Collation(_) => {}
195            }
196        }
197    }
198    constraints
199}
200
201impl<S: ContextProvider> SqlToRel<'_, S> {
202    /// Generate a logical plan from an DataFusion SQL statement
203    pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
204        match statement {
205            DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
206            DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
207            DFStatement::CopyTo(s) => self.copy_to_plan(s),
208            DFStatement::Explain(ExplainStatement {
209                verbose,
210                analyze,
211                format,
212                statement,
213            }) => self.explain_to_plan(verbose, analyze, format, *statement),
214            DFStatement::Reset(statement) => self.reset_statement_to_plan(statement),
215        }
216    }
217
218    /// Generate a logical plan from an SQL statement
219    pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
220        self.sql_statement_to_plan_with_context_impl(
221            statement,
222            &mut PlannerContext::new(),
223        )
224    }
225
226    /// Generate a logical plan from an SQL statement
227    pub fn sql_statement_to_plan_with_context(
228        &self,
229        statement: Statement,
230        planner_context: &mut PlannerContext,
231    ) -> Result<LogicalPlan> {
232        self.sql_statement_to_plan_with_context_impl(statement, planner_context)
233    }
234
235    fn sql_statement_to_plan_with_context_impl(
236        &self,
237        statement: Statement,
238        planner_context: &mut PlannerContext,
239    ) -> Result<LogicalPlan> {
240        match statement {
241            Statement::ExplainTable {
242                describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, // only parse 'DESCRIBE table_name' or 'DESC table_name' and not 'EXPLAIN table_name'
243                table_name,
244                ..
245            } => self.describe_table_to_plan(table_name),
246            Statement::Explain {
247                describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, // only parse 'DESCRIBE statement' or 'DESC statement' and not 'EXPLAIN statement'
248                statement,
249                ..
250            } => match *statement {
251                Statement::Query(query) => self.describe_query_to_plan(*query),
252                _ => {
253                    not_impl_err!("Describing statements other than SELECT not supported")
254                }
255            },
256            Statement::Explain {
257                verbose,
258                statement,
259                analyze,
260                format,
261                describe_alias: _,
262                ..
263            } => {
264                let format = format.map(|format| format.to_string());
265                let statement = DFStatement::Statement(statement);
266                self.explain_to_plan(verbose, analyze, format, statement)
267            }
268            Statement::Query(query) => self.query_to_plan(*query, planner_context),
269            Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
270            Statement::Set(statement) => self.set_statement_to_plan(statement),
271            Statement::CreateTable(CreateTable {
272                temporary,
273                external,
274                global,
275                transient,
276                volatile,
277                hive_distribution,
278                hive_formats,
279                file_format,
280                location,
281                query,
282                name,
283                columns,
284                constraints,
285                if_not_exists,
286                or_replace,
287                without_rowid,
288                like,
289                clone,
290                comment,
291                on_commit,
292                on_cluster,
293                primary_key,
294                order_by,
295                partition_by,
296                cluster_by,
297                clustered_by,
298                strict,
299                copy_grants,
300                enable_schema_evolution,
301                change_tracking,
302                data_retention_time_in_days,
303                max_data_extension_time_in_days,
304                default_ddl_collation,
305                with_aggregation_policy,
306                with_row_access_policy,
307                with_tags,
308                iceberg,
309                external_volume,
310                base_location,
311                catalog,
312                catalog_sync,
313                storage_serialization_policy,
314                inherits,
315                table_options: CreateTableOptions::None,
316                dynamic,
317                version,
318                target_lag,
319                warehouse,
320                refresh_mode,
321                initialize,
322                require_user,
323            }) => {
324                if temporary {
325                    return not_impl_err!("Temporary tables not supported")?;
326                }
327                if external {
328                    return not_impl_err!("External tables not supported")?;
329                }
330                if global.is_some() {
331                    return not_impl_err!("Global tables not supported")?;
332                }
333                if transient {
334                    return not_impl_err!("Transient tables not supported")?;
335                }
336                if volatile {
337                    return not_impl_err!("Volatile tables not supported")?;
338                }
339                if hive_distribution != ast::HiveDistributionStyle::NONE {
340                    return not_impl_err!(
341                        "Hive distribution not supported: {hive_distribution:?}"
342                    )?;
343                }
344                if !matches!(
345                    hive_formats,
346                    Some(ast::HiveFormat {
347                        row_format: None,
348                        serde_properties: None,
349                        storage: None,
350                        location: None,
351                    })
352                ) {
353                    return not_impl_err!(
354                        "Hive formats not supported: {hive_formats:?}"
355                    )?;
356                }
357                if file_format.is_some() {
358                    return not_impl_err!("File format not supported")?;
359                }
360                if location.is_some() {
361                    return not_impl_err!("Location not supported")?;
362                }
363                if without_rowid {
364                    return not_impl_err!("Without rowid not supported")?;
365                }
366                if like.is_some() {
367                    return not_impl_err!("Like not supported")?;
368                }
369                if clone.is_some() {
370                    return not_impl_err!("Clone not supported")?;
371                }
372                if comment.is_some() {
373                    return not_impl_err!("Comment not supported")?;
374                }
375                if on_commit.is_some() {
376                    return not_impl_err!("On commit not supported")?;
377                }
378                if on_cluster.is_some() {
379                    return not_impl_err!("On cluster not supported")?;
380                }
381                if primary_key.is_some() {
382                    return not_impl_err!("Primary key not supported")?;
383                }
384                if order_by.is_some() {
385                    return not_impl_err!("Order by not supported")?;
386                }
387                if partition_by.is_some() {
388                    return not_impl_err!("Partition by not supported")?;
389                }
390                if cluster_by.is_some() {
391                    return not_impl_err!("Cluster by not supported")?;
392                }
393                if clustered_by.is_some() {
394                    return not_impl_err!("Clustered by not supported")?;
395                }
396                if strict {
397                    return not_impl_err!("Strict not supported")?;
398                }
399                if copy_grants {
400                    return not_impl_err!("Copy grants not supported")?;
401                }
402                if enable_schema_evolution.is_some() {
403                    return not_impl_err!("Enable schema evolution not supported")?;
404                }
405                if change_tracking.is_some() {
406                    return not_impl_err!("Change tracking not supported")?;
407                }
408                if data_retention_time_in_days.is_some() {
409                    return not_impl_err!("Data retention time in days not supported")?;
410                }
411                if max_data_extension_time_in_days.is_some() {
412                    return not_impl_err!(
413                        "Max data extension time in days not supported"
414                    )?;
415                }
416                if default_ddl_collation.is_some() {
417                    return not_impl_err!("Default DDL collation not supported")?;
418                }
419                if with_aggregation_policy.is_some() {
420                    return not_impl_err!("With aggregation policy not supported")?;
421                }
422                if with_row_access_policy.is_some() {
423                    return not_impl_err!("With row access policy not supported")?;
424                }
425                if with_tags.is_some() {
426                    return not_impl_err!("With tags not supported")?;
427                }
428                if iceberg {
429                    return not_impl_err!("Iceberg not supported")?;
430                }
431                if external_volume.is_some() {
432                    return not_impl_err!("External volume not supported")?;
433                }
434                if base_location.is_some() {
435                    return not_impl_err!("Base location not supported")?;
436                }
437                if catalog.is_some() {
438                    return not_impl_err!("Catalog not supported")?;
439                }
440                if catalog_sync.is_some() {
441                    return not_impl_err!("Catalog sync not supported")?;
442                }
443                if storage_serialization_policy.is_some() {
444                    return not_impl_err!("Storage serialization policy not supported")?;
445                }
446                if inherits.is_some() {
447                    return not_impl_err!("Table inheritance not supported")?;
448                }
449                if dynamic {
450                    return not_impl_err!("Dynamic tables not supported")?;
451                }
452                if version.is_some() {
453                    return not_impl_err!("Version not supported")?;
454                }
455                if target_lag.is_some() {
456                    return not_impl_err!("Target lag not supported")?;
457                }
458                if warehouse.is_some() {
459                    return not_impl_err!("Warehouse not supported")?;
460                }
461                if refresh_mode.is_some() {
462                    return not_impl_err!("Refresh mode not supported")?;
463                }
464                if initialize.is_some() {
465                    return not_impl_err!("Initialize not supported")?;
466                }
467                if require_user {
468                    return not_impl_err!("Require user not supported")?;
469                }
470                // Merge inline constraints and existing constraints
471                let mut all_constraints = constraints;
472                let inline_constraints = calc_inline_constraints_from_columns(&columns);
473                all_constraints.extend(inline_constraints);
474                // Build column default values
475                let column_defaults =
476                    self.build_column_defaults(&columns, planner_context)?;
477
478                let has_columns = !columns.is_empty();
479                let schema = self.build_schema(columns)?.to_dfschema_ref()?;
480                if has_columns {
481                    planner_context.set_table_schema(Some(Arc::clone(&schema)));
482                }
483
484                match query {
485                    Some(query) => {
486                        let plan = self.query_to_plan(*query, planner_context)?;
487                        let input_schema = plan.schema();
488
489                        let plan = if has_columns {
490                            if schema.fields().len() != input_schema.fields().len() {
491                                return plan_err!(
492                                    "Mismatch: {} columns specified, but result has {} columns",
493                                    schema.fields().len(),
494                                    input_schema.fields().len()
495                                );
496                            }
497                            let input_fields = input_schema.fields();
498                            let project_exprs = schema
499                                .fields()
500                                .iter()
501                                .zip(input_fields)
502                                .map(|(field, input_field)| {
503                                    cast(
504                                        col(input_field.name()),
505                                        field.data_type().clone(),
506                                    )
507                                    .alias(field.name())
508                                })
509                                .collect::<Vec<_>>();
510
511                            LogicalPlanBuilder::from(plan.clone())
512                                .project(project_exprs)?
513                                .build()?
514                        } else {
515                            plan
516                        };
517
518                        let constraints = self.new_constraint_from_table_constraints(
519                            &all_constraints,
520                            plan.schema(),
521                        )?;
522
523                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
524                            CreateMemoryTable {
525                                name: self.object_name_to_table_reference(name)?,
526                                constraints,
527                                input: Arc::new(plan),
528                                if_not_exists,
529                                or_replace,
530                                column_defaults,
531                                temporary,
532                            },
533                        )))
534                    }
535
536                    None => {
537                        let plan = EmptyRelation {
538                            produce_one_row: false,
539                            schema,
540                        };
541                        let plan = LogicalPlan::EmptyRelation(plan);
542                        let constraints = self.new_constraint_from_table_constraints(
543                            &all_constraints,
544                            plan.schema(),
545                        )?;
546                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
547                            CreateMemoryTable {
548                                name: self.object_name_to_table_reference(name)?,
549                                constraints,
550                                input: Arc::new(plan),
551                                if_not_exists,
552                                or_replace,
553                                column_defaults,
554                                temporary,
555                            },
556                        )))
557                    }
558                }
559            }
560            Statement::CreateView {
561                or_replace,
562                materialized,
563                name,
564                columns,
565                query,
566                options: CreateTableOptions::None,
567                cluster_by,
568                comment,
569                with_no_schema_binding,
570                if_not_exists,
571                temporary,
572                to,
573                params,
574                or_alter,
575                secure,
576                name_before_not_exists,
577            } => {
578                if materialized {
579                    return not_impl_err!("Materialized views not supported")?;
580                }
581                if !cluster_by.is_empty() {
582                    return not_impl_err!("Cluster by not supported")?;
583                }
584                if comment.is_some() {
585                    return not_impl_err!("Comment not supported")?;
586                }
587                if with_no_schema_binding {
588                    return not_impl_err!("With no schema binding not supported")?;
589                }
590                if if_not_exists {
591                    return not_impl_err!("If not exists not supported")?;
592                }
593                if to.is_some() {
594                    return not_impl_err!("To not supported")?;
595                }
596
597                // put the statement back together temporarily to get the SQL
598                // string representation
599                let stmt = Statement::CreateView {
600                    or_replace,
601                    materialized,
602                    name,
603                    columns,
604                    query,
605                    options: CreateTableOptions::None,
606                    cluster_by,
607                    comment,
608                    with_no_schema_binding,
609                    if_not_exists,
610                    temporary,
611                    to,
612                    params,
613                    or_alter,
614                    secure,
615                    name_before_not_exists,
616                };
617                let sql = stmt.to_string();
618                let Statement::CreateView {
619                    name,
620                    columns,
621                    query,
622                    or_replace,
623                    temporary,
624                    ..
625                } = stmt
626                else {
627                    return internal_err!("Unreachable code in create view");
628                };
629
630                let columns = columns
631                    .into_iter()
632                    .map(|view_column_def| {
633                        if let Some(options) = view_column_def.options {
634                            plan_err!(
635                                "Options not supported for view columns: {options:?}"
636                            )
637                        } else {
638                            Ok(view_column_def.name)
639                        }
640                    })
641                    .collect::<Result<Vec<_>>>()?;
642
643                let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
644                plan = self.apply_expr_alias(plan, columns)?;
645
646                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
647                    name: self.object_name_to_table_reference(name)?,
648                    input: Arc::new(plan),
649                    or_replace,
650                    definition: Some(sql),
651                    temporary,
652                })))
653            }
654            Statement::ShowCreate { obj_type, obj_name } => match obj_type {
655                ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
656                _ => {
657                    not_impl_err!("Only `SHOW CREATE TABLE  ...` statement is supported")
658                }
659            },
660            Statement::CreateSchema {
661                schema_name,
662                if_not_exists,
663                ..
664            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
665                CreateCatalogSchema {
666                    schema_name: get_schema_name(&schema_name),
667                    if_not_exists,
668                    schema: Arc::new(DFSchema::empty()),
669                },
670            ))),
671            Statement::CreateDatabase {
672                db_name,
673                if_not_exists,
674                ..
675            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
676                CreateCatalog {
677                    catalog_name: object_name_to_string(&db_name),
678                    if_not_exists,
679                    schema: Arc::new(DFSchema::empty()),
680                },
681            ))),
682            Statement::Drop {
683                object_type,
684                if_exists,
685                mut names,
686                cascade,
687                restrict: _,
688                purge: _,
689                temporary: _,
690                table: _,
691            } => {
692                // We don't support cascade and purge for now.
693                // nor do we support multiple object names
694                let name = match names.len() {
695                    0 => Err(ParserError("Missing table name.".to_string()).into()),
696                    1 => self.object_name_to_table_reference(names.pop().unwrap()),
697                    _ => {
698                        Err(ParserError("Multiple objects not supported".to_string())
699                            .into())
700                    }
701                }?;
702
703                match object_type {
704                    ObjectType::Table => {
705                        Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
706                            name,
707                            if_exists,
708                            schema: DFSchemaRef::new(DFSchema::empty()),
709                        })))
710                    }
711                    ObjectType::View => {
712                        Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
713                            name,
714                            if_exists,
715                            schema: DFSchemaRef::new(DFSchema::empty()),
716                        })))
717                    }
718                    ObjectType::Schema => {
719                        let name = match name {
720                            TableReference::Bare { table } => {
721                                Ok(SchemaReference::Bare { schema: table })
722                            }
723                            TableReference::Partial { schema, table } => {
724                                Ok(SchemaReference::Full {
725                                    schema: table,
726                                    catalog: schema,
727                                })
728                            }
729                            TableReference::Full {
730                                catalog: _,
731                                schema: _,
732                                table: _,
733                            } => Err(ParserError(
734                                "Invalid schema specifier (has 3 parts)".to_string(),
735                            )),
736                        }?;
737                        Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(
738                            DropCatalogSchema {
739                                name,
740                                if_exists,
741                                cascade,
742                                schema: DFSchemaRef::new(DFSchema::empty()),
743                            },
744                        )))
745                    }
746                    _ => not_impl_err!(
747                        "Only `DROP TABLE/VIEW/SCHEMA  ...` statement is supported currently"
748                    ),
749                }
750            }
751            Statement::Prepare {
752                name,
753                data_types,
754                statement,
755            } => {
756                // Convert parser data types to DataFusion data types
757                let mut fields: Vec<FieldRef> = data_types
758                    .into_iter()
759                    .map(|t| self.convert_data_type_to_field(&t))
760                    .collect::<Result<_>>()?;
761
762                // Create planner context with parameters
763                let mut planner_context =
764                    PlannerContext::new().with_prepare_param_data_types(fields.clone());
765
766                // Build logical plan for inner statement of the prepare statement
767                let plan = self.sql_statement_to_plan_with_context_impl(
768                    *statement,
769                    &mut planner_context,
770                )?;
771
772                if fields.is_empty() {
773                    let map_types = plan.get_parameter_fields()?;
774                    let param_types: Vec<_> = (1..=map_types.len())
775                        .filter_map(|i| {
776                            let key = format!("${i}");
777                            map_types.get(&key).and_then(|opt| opt.clone())
778                        })
779                        .collect();
780                    fields.extend(param_types.iter().cloned());
781                    planner_context.with_prepare_param_data_types(param_types);
782                }
783
784                Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
785                    name: ident_to_string(&name),
786                    fields,
787                    input: Arc::new(plan),
788                })))
789            }
790            Statement::Execute {
791                name,
792                parameters,
793                using,
794                // has_parentheses specifies the syntax, but the plan is the
795                // same no matter the syntax used, so ignore it
796                has_parentheses: _,
797                immediate,
798                into,
799                output,
800                default,
801            } => {
802                // `USING` is a MySQL-specific syntax and currently not supported.
803                if !using.is_empty() {
804                    return not_impl_err!(
805                        "Execute statement with USING is not supported"
806                    );
807                }
808                if immediate {
809                    return not_impl_err!(
810                        "Execute statement with IMMEDIATE is not supported"
811                    );
812                }
813                if !into.is_empty() {
814                    return not_impl_err!("Execute statement with INTO is not supported");
815                }
816                if output {
817                    return not_impl_err!(
818                        "Execute statement with OUTPUT is not supported"
819                    );
820                }
821                if default {
822                    return not_impl_err!(
823                        "Execute statement with DEFAULT is not supported"
824                    );
825                }
826                let empty_schema = DFSchema::empty();
827                let parameters = parameters
828                    .into_iter()
829                    .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
830                    .collect::<Result<Vec<Expr>>>()?;
831
832                Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
833                    name: object_name_to_string(&name.unwrap()),
834                    parameters,
835                })))
836            }
837            Statement::Deallocate {
838                name,
839                // Similar to PostgreSQL, the PREPARE keyword is ignored
840                prepare: _,
841            } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
842                Deallocate {
843                    name: ident_to_string(&name),
844                },
845            ))),
846
847            Statement::ShowTables {
848                extended,
849                full,
850                terse,
851                history,
852                external,
853                show_options,
854            } => {
855                // We only support the basic "SHOW TABLES"
856                // https://github.com/apache/datafusion/issues/3188
857                if extended {
858                    return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
859                }
860                if full {
861                    return not_impl_err!("SHOW FULL TABLES not supported")?;
862                }
863                if terse {
864                    return not_impl_err!("SHOW TERSE TABLES not supported")?;
865                }
866                if history {
867                    return not_impl_err!("SHOW TABLES HISTORY not supported")?;
868                }
869                if external {
870                    return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
871                }
872                let ShowStatementOptions {
873                    show_in,
874                    starts_with,
875                    limit,
876                    limit_from,
877                    filter_position,
878                } = show_options;
879                if show_in.is_some() {
880                    return not_impl_err!("SHOW TABLES IN not supported")?;
881                }
882                if starts_with.is_some() {
883                    return not_impl_err!("SHOW TABLES LIKE not supported")?;
884                }
885                if limit.is_some() {
886                    return not_impl_err!("SHOW TABLES LIMIT not supported")?;
887                }
888                if limit_from.is_some() {
889                    return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
890                }
891                if filter_position.is_some() {
892                    return not_impl_err!("SHOW TABLES FILTER not supported")?;
893                }
894                self.show_tables_to_plan()
895            }
896
897            Statement::ShowColumns {
898                extended,
899                full,
900                show_options,
901            } => {
902                let ShowStatementOptions {
903                    show_in,
904                    starts_with,
905                    limit,
906                    limit_from,
907                    filter_position,
908                } = show_options;
909                if starts_with.is_some() {
910                    return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
911                }
912                if limit.is_some() {
913                    return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
914                }
915                if limit_from.is_some() {
916                    return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
917                }
918                if filter_position.is_some() {
919                    return not_impl_err!(
920                        "SHOW COLUMNS with WHERE or LIKE is not supported"
921                    )?;
922                }
923                let Some(ShowStatementIn {
924                    // specifies if the syntax was `SHOW COLUMNS IN` or `SHOW
925                    // COLUMNS FROM` which is not different in DataFusion
926                    clause: _,
927                    parent_type,
928                    parent_name,
929                }) = show_in
930                else {
931                    return plan_err!("SHOW COLUMNS requires a table name");
932                };
933
934                if let Some(parent_type) = parent_type {
935                    return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
936                }
937                let Some(table_name) = parent_name else {
938                    return plan_err!("SHOW COLUMNS requires a table name");
939                };
940
941                self.show_columns_to_plan(extended, full, table_name)
942            }
943
944            Statement::ShowFunctions { filter, .. } => {
945                self.show_functions_to_plan(filter)
946            }
947
948            Statement::Insert(Insert {
949                or,
950                into,
951                columns,
952                overwrite,
953                source,
954                partitioned,
955                after_columns,
956                table,
957                on,
958                returning,
959                ignore,
960                table_alias,
961                mut replace_into,
962                priority,
963                insert_alias,
964                assignments,
965                has_table_keyword,
966                settings,
967                format_clause,
968            }) => {
969                let table_name = match table {
970                    TableObject::TableName(table_name) => table_name,
971                    TableObject::TableFunction(_) => {
972                        return not_impl_err!(
973                            "INSERT INTO Table functions not supported"
974                        );
975                    }
976                };
977                if let Some(or) = or {
978                    match or {
979                        SqliteOnConflict::Replace => replace_into = true,
980                        _ => plan_err!("Inserts with {or} clause is not supported")?,
981                    }
982                }
983                if partitioned.is_some() {
984                    plan_err!("Partitioned inserts not yet supported")?;
985                }
986                if !after_columns.is_empty() {
987                    plan_err!("After-columns clause not supported")?;
988                }
989                if on.is_some() {
990                    plan_err!("Insert-on clause not supported")?;
991                }
992                if returning.is_some() {
993                    plan_err!("Insert-returning clause not supported")?;
994                }
995                if ignore {
996                    plan_err!("Insert-ignore clause not supported")?;
997                }
998                let Some(source) = source else {
999                    plan_err!("Inserts without a source not supported")?
1000                };
1001                if let Some(table_alias) = table_alias {
1002                    plan_err!(
1003                        "Inserts with a table alias not supported: {table_alias:?}"
1004                    )?
1005                };
1006                if let Some(priority) = priority {
1007                    plan_err!(
1008                        "Inserts with a `PRIORITY` clause not supported: {priority:?}"
1009                    )?
1010                };
1011                if insert_alias.is_some() {
1012                    plan_err!("Inserts with an alias not supported")?;
1013                }
1014                if !assignments.is_empty() {
1015                    plan_err!("Inserts with assignments not supported")?;
1016                }
1017                if settings.is_some() {
1018                    plan_err!("Inserts with settings not supported")?;
1019                }
1020                if format_clause.is_some() {
1021                    plan_err!("Inserts with format clause not supported")?;
1022                }
1023                // optional keywords don't change behavior
1024                let _ = into;
1025                let _ = has_table_keyword;
1026                self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
1027            }
1028            Statement::Update {
1029                table,
1030                assignments,
1031                from,
1032                selection,
1033                returning,
1034                or,
1035                limit,
1036            } => {
1037                let from_clauses =
1038                    from.map(|update_table_from_kind| match update_table_from_kind {
1039                        UpdateTableFromKind::BeforeSet(from_clauses) => from_clauses,
1040                        UpdateTableFromKind::AfterSet(from_clauses) => from_clauses,
1041                    });
1042                // TODO: support multiple tables in UPDATE SET FROM
1043                if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
1044                    plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
1045                }
1046                let update_from = from_clauses.and_then(|mut f| f.pop());
1047                if returning.is_some() {
1048                    plan_err!("Update-returning clause not yet supported")?;
1049                }
1050                if or.is_some() {
1051                    plan_err!("ON conflict not supported")?;
1052                }
1053                if limit.is_some() {
1054                    return not_impl_err!("Update-limit clause not supported")?;
1055                }
1056                self.update_to_plan(table, &assignments, update_from, selection)
1057            }
1058
1059            Statement::Delete(Delete {
1060                tables,
1061                using,
1062                selection,
1063                returning,
1064                from,
1065                order_by,
1066                limit,
1067            }) => {
1068                if !tables.is_empty() {
1069                    plan_err!("DELETE <TABLE> not supported")?;
1070                }
1071
1072                if using.is_some() {
1073                    plan_err!("Using clause not supported")?;
1074                }
1075
1076                if returning.is_some() {
1077                    plan_err!("Delete-returning clause not yet supported")?;
1078                }
1079
1080                if !order_by.is_empty() {
1081                    plan_err!("Delete-order-by clause not yet supported")?;
1082                }
1083
1084                if limit.is_some() {
1085                    plan_err!("Delete-limit clause not yet supported")?;
1086                }
1087
1088                let table_name = self.get_delete_target(from)?;
1089                self.delete_to_plan(&table_name, selection)
1090            }
1091
1092            Statement::StartTransaction {
1093                modes,
1094                begin: false,
1095                modifier,
1096                transaction,
1097                statements,
1098                has_end_keyword,
1099                exception,
1100            } => {
1101                if let Some(modifier) = modifier {
1102                    return not_impl_err!(
1103                        "Transaction modifier not supported: {modifier}"
1104                    );
1105                }
1106                if !statements.is_empty() {
1107                    return not_impl_err!(
1108                        "Transaction with multiple statements not supported"
1109                    );
1110                }
1111                if exception.is_some() {
1112                    return not_impl_err!(
1113                        "Transaction with exception statements not supported"
1114                    );
1115                }
1116                if has_end_keyword {
1117                    return not_impl_err!("Transaction with END keyword not supported");
1118                }
1119                self.validate_transaction_kind(transaction.as_ref())?;
1120                let isolation_level: ast::TransactionIsolationLevel = modes
1121                    .iter()
1122                    .filter_map(|m: &TransactionMode| match m {
1123                        TransactionMode::AccessMode(_) => None,
1124                        TransactionMode::IsolationLevel(level) => Some(level),
1125                    })
1126                    .next_back()
1127                    .copied()
1128                    .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1129                let access_mode: ast::TransactionAccessMode = modes
1130                    .iter()
1131                    .filter_map(|m: &TransactionMode| match m {
1132                        TransactionMode::AccessMode(mode) => Some(mode),
1133                        TransactionMode::IsolationLevel(_) => None,
1134                    })
1135                    .next_back()
1136                    .copied()
1137                    .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1138                let isolation_level = match isolation_level {
1139                    ast::TransactionIsolationLevel::ReadUncommitted => {
1140                        TransactionIsolationLevel::ReadUncommitted
1141                    }
1142                    ast::TransactionIsolationLevel::ReadCommitted => {
1143                        TransactionIsolationLevel::ReadCommitted
1144                    }
1145                    ast::TransactionIsolationLevel::RepeatableRead => {
1146                        TransactionIsolationLevel::RepeatableRead
1147                    }
1148                    ast::TransactionIsolationLevel::Serializable => {
1149                        TransactionIsolationLevel::Serializable
1150                    }
1151                    ast::TransactionIsolationLevel::Snapshot => {
1152                        TransactionIsolationLevel::Snapshot
1153                    }
1154                };
1155                let access_mode = match access_mode {
1156                    ast::TransactionAccessMode::ReadOnly => {
1157                        TransactionAccessMode::ReadOnly
1158                    }
1159                    ast::TransactionAccessMode::ReadWrite => {
1160                        TransactionAccessMode::ReadWrite
1161                    }
1162                };
1163                let statement = PlanStatement::TransactionStart(TransactionStart {
1164                    access_mode,
1165                    isolation_level,
1166                });
1167                Ok(LogicalPlan::Statement(statement))
1168            }
1169            Statement::Commit {
1170                chain,
1171                end,
1172                modifier,
1173            } => {
1174                if end {
1175                    return not_impl_err!("COMMIT AND END not supported");
1176                };
1177                if let Some(modifier) = modifier {
1178                    return not_impl_err!("COMMIT {modifier} not supported");
1179                };
1180                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1181                    conclusion: TransactionConclusion::Commit,
1182                    chain,
1183                });
1184                Ok(LogicalPlan::Statement(statement))
1185            }
1186            Statement::Rollback { chain, savepoint } => {
1187                if savepoint.is_some() {
1188                    plan_err!("Savepoints not supported")?;
1189                }
1190                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1191                    conclusion: TransactionConclusion::Rollback,
1192                    chain,
1193                });
1194                Ok(LogicalPlan::Statement(statement))
1195            }
1196            Statement::CreateFunction(ast::CreateFunction {
1197                or_replace,
1198                temporary,
1199                name,
1200                args,
1201                return_type,
1202                function_body,
1203                behavior,
1204                language,
1205                ..
1206            }) => {
1207                let return_type = match return_type {
1208                    Some(t) => Some(self.convert_data_type_to_field(&t)?),
1209                    None => None,
1210                };
1211                let mut planner_context = PlannerContext::new();
1212                let empty_schema = &DFSchema::empty();
1213
1214                let args = match args {
1215                    Some(function_args) => {
1216                        let function_args = function_args
1217                            .into_iter()
1218                            .map(|arg| {
1219                                let data_type =
1220                                    self.convert_data_type_to_field(&arg.data_type)?;
1221
1222                                let default_expr = match arg.default_expr {
1223                                    Some(expr) => Some(self.sql_to_expr(
1224                                        expr,
1225                                        empty_schema,
1226                                        &mut planner_context,
1227                                    )?),
1228                                    None => None,
1229                                };
1230                                Ok(OperateFunctionArg {
1231                                    name: arg.name,
1232                                    default_expr,
1233                                    data_type: data_type.data_type().clone(),
1234                                })
1235                            })
1236                            .collect::<Result<Vec<OperateFunctionArg>>>();
1237                        Some(function_args?)
1238                    }
1239                    None => None,
1240                };
1241                // Validate default arguments
1242                let first_default = match args.as_ref() {
1243                    Some(arg) => arg.iter().position(|t| t.default_expr.is_some()),
1244                    None => None,
1245                };
1246                let last_non_default = match args.as_ref() {
1247                    Some(arg) => arg
1248                        .iter()
1249                        .rev()
1250                        .position(|t| t.default_expr.is_none())
1251                        .map(|reverse_pos| arg.len() - reverse_pos - 1),
1252                    None => None,
1253                };
1254                if let (Some(pos_default), Some(pos_non_default)) =
1255                    (first_default, last_non_default)
1256                    && pos_non_default > pos_default
1257                {
1258                    return plan_err!(
1259                        "Non-default arguments cannot follow default arguments."
1260                    );
1261                }
1262                // At the moment functions can't be qualified `schema.name`
1263                let name = match &name.0[..] {
1264                    [] => exec_err!("Function should have name")?,
1265                    [n] => n.as_ident().unwrap().value.clone(),
1266                    [..] => not_impl_err!("Qualified functions are not supported")?,
1267                };
1268                //
1269                // Convert resulting expression to data fusion expression
1270                //
1271                let arg_types = args.as_ref().map(|arg| {
1272                    arg.iter()
1273                        .map(|t| {
1274                            let name = match t.name.clone() {
1275                                Some(name) => name.value,
1276                                None => "".to_string(),
1277                            };
1278                            Arc::new(Field::new(name, t.data_type.clone(), true))
1279                        })
1280                        .collect::<Vec<_>>()
1281                });
1282                // Validate parameter style
1283                if let Some(ref fields) = arg_types {
1284                    let count_positional =
1285                        fields.iter().filter(|f| f.name() == "").count();
1286                    if !(count_positional == 0 || count_positional == fields.len()) {
1287                        return plan_err!(
1288                            "All function arguments must use either named or positional style."
1289                        );
1290                    }
1291                }
1292                let mut planner_context = PlannerContext::new()
1293                    .with_prepare_param_data_types(arg_types.unwrap_or_default());
1294
1295                let function_body = match function_body {
1296                    Some(r) => Some(self.sql_to_expr(
1297                        match r {
1298                            ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1299                            ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1300                            ast::CreateFunctionBody::Return(expr) => expr,
1301                            ast::CreateFunctionBody::AsBeginEnd(_) => {
1302                                return not_impl_err!(
1303                                    "BEGIN/END enclosed function body syntax is not supported"
1304                                )?;
1305                            }
1306                            ast::CreateFunctionBody::AsReturnExpr(_)
1307                            | ast::CreateFunctionBody::AsReturnSelect(_) => {
1308                                return not_impl_err!(
1309                                    "AS RETURN function syntax is not supported"
1310                                )?
1311                            }
1312                        },
1313                        &DFSchema::empty(),
1314                        &mut planner_context,
1315                    )?),
1316                    None => None,
1317                };
1318
1319                let params = CreateFunctionBody {
1320                    language,
1321                    behavior: behavior.map(|b| match b {
1322                        ast::FunctionBehavior::Immutable => Volatility::Immutable,
1323                        ast::FunctionBehavior::Stable => Volatility::Stable,
1324                        ast::FunctionBehavior::Volatile => Volatility::Volatile,
1325                    }),
1326                    function_body,
1327                };
1328
1329                let statement = DdlStatement::CreateFunction(CreateFunction {
1330                    or_replace,
1331                    temporary,
1332                    name,
1333                    return_type: return_type.map(|f| f.data_type().clone()),
1334                    args,
1335                    params,
1336                    schema: DFSchemaRef::new(DFSchema::empty()),
1337                });
1338
1339                Ok(LogicalPlan::Ddl(statement))
1340            }
1341            Statement::DropFunction {
1342                if_exists,
1343                func_desc,
1344                ..
1345            } => {
1346                // According to postgresql documentation it can be only one function
1347                // specified in drop statement
1348                if let Some(desc) = func_desc.first() {
1349                    // At the moment functions can't be qualified `schema.name`
1350                    let name = match &desc.name.0[..] {
1351                        [] => exec_err!("Function should have name")?,
1352                        [n] => n.as_ident().unwrap().value.clone(),
1353                        [..] => not_impl_err!("Qualified functions are not supported")?,
1354                    };
1355                    let statement = DdlStatement::DropFunction(DropFunction {
1356                        if_exists,
1357                        name,
1358                        schema: DFSchemaRef::new(DFSchema::empty()),
1359                    });
1360                    Ok(LogicalPlan::Ddl(statement))
1361                } else {
1362                    exec_err!("Function name not provided")
1363                }
1364            }
1365            Statement::CreateIndex(CreateIndex {
1366                name,
1367                table_name,
1368                using,
1369                columns,
1370                unique,
1371                if_not_exists,
1372                ..
1373            }) => {
1374                let name: Option<String> = name.as_ref().map(object_name_to_string);
1375                let table = self.object_name_to_table_reference(table_name)?;
1376                let table_schema = self
1377                    .context_provider
1378                    .get_table_source(table.clone())?
1379                    .schema()
1380                    .to_dfschema_ref()?;
1381                let using: Option<String> =
1382                    using.as_ref().map(|index_type| match index_type {
1383                        IndexType::Custom(ident) => ident_to_string(ident),
1384                        _ => index_type.to_string().to_ascii_lowercase(),
1385                    });
1386                let order_by_exprs: Vec<OrderByExpr> =
1387                    columns.into_iter().map(|col| col.column).collect();
1388                let columns = self.order_by_to_sort_expr(
1389                    order_by_exprs,
1390                    &table_schema,
1391                    planner_context,
1392                    false,
1393                    None,
1394                )?;
1395                Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1396                    PlanCreateIndex {
1397                        name,
1398                        table,
1399                        using,
1400                        columns,
1401                        unique,
1402                        if_not_exists,
1403                        schema: DFSchemaRef::new(DFSchema::empty()),
1404                    },
1405                )))
1406            }
1407            stmt => {
1408                not_impl_err!("Unsupported SQL statement: {stmt}")
1409            }
1410        }
1411    }
1412
1413    fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1414        let mut from = match from {
1415            FromTable::WithFromKeyword(v) => v,
1416            FromTable::WithoutKeyword(v) => v,
1417        };
1418
1419        if from.len() != 1 {
1420            return not_impl_err!(
1421                "DELETE FROM only supports single table, got {}: {from:?}",
1422                from.len()
1423            );
1424        }
1425        let table_factor = from.pop().unwrap();
1426        if !table_factor.joins.is_empty() {
1427            return not_impl_err!("DELETE FROM only supports single table, got: joins");
1428        }
1429        let TableFactor::Table { name, .. } = table_factor.relation else {
1430            return not_impl_err!(
1431                "DELETE FROM only supports single table, got: {table_factor:?}"
1432            );
1433        };
1434
1435        Ok(name)
1436    }
1437
1438    /// Generate a logical plan from a "SHOW TABLES" query
1439    fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1440        if self.has_table("information_schema", "tables") {
1441            let query = "SELECT * FROM information_schema.tables;";
1442            let mut rewrite = DFParser::parse_sql(query)?;
1443            assert_eq!(rewrite.len(), 1);
1444            self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
1445        } else {
1446            plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1447        }
1448    }
1449
1450    fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1451        let table_ref = self.object_name_to_table_reference(table_name)?;
1452
1453        let table_source = self.context_provider.get_table_source(table_ref)?;
1454
1455        let schema = table_source.schema();
1456
1457        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1458
1459        Ok(LogicalPlan::DescribeTable(DescribeTable {
1460            schema,
1461            output_schema: Arc::new(output_schema),
1462        }))
1463    }
1464
1465    fn describe_query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
1466        let plan = self.query_to_plan(query, &mut PlannerContext::new())?;
1467
1468        let schema = Arc::new(plan.schema().as_arrow().clone());
1469
1470        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1471
1472        Ok(LogicalPlan::DescribeTable(DescribeTable {
1473            schema,
1474            output_schema: Arc::new(output_schema),
1475        }))
1476    }
1477
1478    fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1479        // Determine if source is table or query and handle accordingly
1480        let copy_source = statement.source;
1481        let (input, input_schema, table_ref) = match copy_source {
1482            CopyToSource::Relation(object_name) => {
1483                let table_name = object_name_to_string(&object_name);
1484                let table_ref = self.object_name_to_table_reference(object_name)?;
1485                let table_source =
1486                    self.context_provider.get_table_source(table_ref.clone())?;
1487                let plan =
1488                    LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1489                let input_schema = Arc::clone(plan.schema());
1490                (plan, input_schema, Some(table_ref))
1491            }
1492            CopyToSource::Query(query) => {
1493                let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1494                let input_schema = Arc::clone(plan.schema());
1495                (plan, input_schema, None)
1496            }
1497        };
1498
1499        let options_map = self.parse_options_map(statement.options, true)?;
1500
1501        let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1502            self.context_provider.get_file_type(stored_as).ok()
1503        } else {
1504            None
1505        };
1506
1507        let file_type = match maybe_file_type {
1508            Some(ft) => ft,
1509            None => {
1510                let e = || {
1511                    DataFusionError::Configuration(
1512                        "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1513                            .to_string(),
1514                    )
1515                };
1516                // Try to infer file format from file extension
1517                let extension: &str = &Path::new(&statement.target)
1518                    .extension()
1519                    .ok_or_else(e)?
1520                    .to_str()
1521                    .ok_or_else(e)?
1522                    .to_lowercase();
1523
1524                self.context_provider.get_file_type(extension)?
1525            }
1526        };
1527
1528        let partition_by = statement
1529            .partitioned_by
1530            .iter()
1531            .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1532            .collect::<Result<Vec<_>>>()?
1533            .into_iter()
1534            .map(|f| f.name().to_owned())
1535            .collect();
1536
1537        Ok(LogicalPlan::Copy(CopyTo::new(
1538            Arc::new(input),
1539            statement.target,
1540            partition_by,
1541            file_type,
1542            options_map,
1543        )))
1544    }
1545
1546    fn build_order_by(
1547        &self,
1548        order_exprs: Vec<LexOrdering>,
1549        schema: &DFSchemaRef,
1550        planner_context: &mut PlannerContext,
1551    ) -> Result<Vec<Vec<SortExpr>>> {
1552        if !order_exprs.is_empty() && schema.fields().is_empty() {
1553            let results = order_exprs
1554                .iter()
1555                .map(|lex_order| {
1556                    let result = lex_order
1557                        .iter()
1558                        .map(|order_by_expr| {
1559                            let ordered_expr = &order_by_expr.expr;
1560                            let ordered_expr = ordered_expr.to_owned();
1561                            let ordered_expr = self.sql_expr_to_logical_expr(
1562                                ordered_expr,
1563                                schema,
1564                                planner_context,
1565                            )?;
1566                            let asc = order_by_expr.options.asc.unwrap_or(true);
1567                            let nulls_first =
1568                                order_by_expr.options.nulls_first.unwrap_or_else(|| {
1569                                    self.options.default_null_ordering.nulls_first(asc)
1570                                });
1571
1572                            Ok(SortExpr::new(ordered_expr, asc, nulls_first))
1573                        })
1574                        .collect::<Result<Vec<SortExpr>>>()?;
1575                    Ok(result)
1576                })
1577                .collect::<Result<Vec<Vec<SortExpr>>>>()?;
1578
1579            return Ok(results);
1580        }
1581
1582        let mut all_results = vec![];
1583        for expr in order_exprs {
1584            // Convert each OrderByExpr to a SortExpr:
1585            let expr_vec =
1586                self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1587            // Verify that columns of all SortExprs exist in the schema:
1588            for sort in expr_vec.iter() {
1589                for column in sort.expr.column_refs().iter() {
1590                    if !schema.has_column(column) {
1591                        // Return an error if any column is not in the schema:
1592                        return plan_err!("Column {column} is not in schema");
1593                    }
1594                }
1595            }
1596            // If all SortExprs are valid, return them as an expression vector
1597            all_results.push(expr_vec)
1598        }
1599        Ok(all_results)
1600    }
1601
1602    /// Generate a logical plan from a CREATE EXTERNAL TABLE statement
1603    fn external_table_to_plan(
1604        &self,
1605        statement: CreateExternalTable,
1606    ) -> Result<LogicalPlan> {
1607        let definition = Some(statement.to_string());
1608        let CreateExternalTable {
1609            name,
1610            columns,
1611            file_type,
1612            location,
1613            table_partition_cols,
1614            if_not_exists,
1615            temporary,
1616            order_exprs,
1617            unbounded,
1618            options,
1619            constraints,
1620            or_replace,
1621        } = statement;
1622
1623        // Merge inline constraints and existing constraints
1624        let mut all_constraints = constraints;
1625        let inline_constraints = calc_inline_constraints_from_columns(&columns);
1626        all_constraints.extend(inline_constraints);
1627
1628        let options_map = self.parse_options_map(options, false)?;
1629
1630        let compression = options_map
1631            .get("format.compression")
1632            .map(|c| CompressionTypeVariant::from_str(c))
1633            .transpose()?;
1634        if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1635            && compression
1636                .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1637                .unwrap_or(false)
1638        {
1639            plan_err!(
1640                "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1641            )?;
1642        }
1643
1644        let mut planner_context = PlannerContext::new();
1645
1646        let column_defaults = self
1647            .build_column_defaults(&columns, &mut planner_context)?
1648            .into_iter()
1649            .collect();
1650
1651        let schema = self.build_schema(columns)?;
1652        let df_schema = schema.to_dfschema_ref()?;
1653        df_schema.check_names()?;
1654
1655        let ordered_exprs =
1656            self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1657
1658        let name = self.object_name_to_table_reference(name)?;
1659        let constraints =
1660            self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1661        Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1662            PlanCreateExternalTable::builder(name, location, file_type, df_schema)
1663                .with_partition_cols(table_partition_cols)
1664                .with_if_not_exists(if_not_exists)
1665                .with_or_replace(or_replace)
1666                .with_temporary(temporary)
1667                .with_definition(definition)
1668                .with_order_exprs(ordered_exprs)
1669                .with_unbounded(unbounded)
1670                .with_options(options_map)
1671                .with_constraints(constraints)
1672                .with_column_defaults(column_defaults)
1673                .build(),
1674        )))
1675    }
1676
1677    /// Get the indices of the constraint columns in the schema.
1678    /// If any column is not found, return an error.
1679    fn get_constraint_column_indices(
1680        &self,
1681        df_schema: &DFSchemaRef,
1682        columns: &[IndexColumn],
1683        constraint_name: &str,
1684    ) -> Result<Vec<usize>> {
1685        let field_names = df_schema.field_names();
1686        columns
1687            .iter()
1688            .map(|index_column| {
1689                let expr = &index_column.column.expr;
1690                let ident = if let SQLExpr::Identifier(ident) = expr {
1691                    ident
1692                } else {
1693                    return Err(plan_datafusion_err!(
1694                        "Column name for {constraint_name} must be an identifier: {expr}"
1695                    ));
1696                };
1697                let column = self.ident_normalizer.normalize(ident.clone());
1698                field_names
1699                    .iter()
1700                    .position(|item| *item == column)
1701                    .ok_or_else(|| {
1702                        plan_datafusion_err!(
1703                            "Column for {constraint_name} not found in schema: {column}"
1704                        )
1705                    })
1706            })
1707            .collect::<Result<Vec<_>>>()
1708    }
1709
1710    /// Convert each [TableConstraint] to corresponding [Constraint]
1711    pub fn new_constraint_from_table_constraints(
1712        &self,
1713        constraints: &[TableConstraint],
1714        df_schema: &DFSchemaRef,
1715    ) -> Result<Constraints> {
1716        let constraints = constraints
1717            .iter()
1718            .map(|c: &TableConstraint| match c {
1719                TableConstraint::Unique { name, columns, .. } => {
1720                    let constraint_name = match name {
1721                        Some(name) => &format!("unique constraint with name '{name}'"),
1722                        None => "unique constraint",
1723                    };
1724                    // Get unique constraint indices in the schema
1725                    let indices = self.get_constraint_column_indices(
1726                        df_schema,
1727                        columns,
1728                        constraint_name,
1729                    )?;
1730                    Ok(Constraint::Unique(indices))
1731                }
1732                TableConstraint::PrimaryKey { columns, .. } => {
1733                    // Get primary key indices in the schema
1734                    let indices = self.get_constraint_column_indices(
1735                        df_schema,
1736                        columns,
1737                        "primary key",
1738                    )?;
1739                    Ok(Constraint::PrimaryKey(indices))
1740                }
1741                TableConstraint::ForeignKey { .. } => {
1742                    _plan_err!("Foreign key constraints are not currently supported")
1743                }
1744                TableConstraint::Check { .. } => {
1745                    _plan_err!("Check constraints are not currently supported")
1746                }
1747                TableConstraint::Index { .. } => {
1748                    _plan_err!("Indexes are not currently supported")
1749                }
1750                TableConstraint::FulltextOrSpatial { .. } => {
1751                    _plan_err!("Indexes are not currently supported")
1752                }
1753            })
1754            .collect::<Result<Vec<_>>>()?;
1755        Ok(Constraints::new_unverified(constraints))
1756    }
1757
1758    fn parse_options_map(
1759        &self,
1760        options: Vec<(String, Value)>,
1761        allow_duplicates: bool,
1762    ) -> Result<HashMap<String, String>> {
1763        let mut options_map = HashMap::new();
1764        for (key, value) in options {
1765            if !allow_duplicates && options_map.contains_key(&key) {
1766                return plan_err!("Option {key} is specified multiple times");
1767            }
1768
1769            let Some(value_string) = crate::utils::value_to_string(&value) else {
1770                return plan_err!("Unsupported Value {}", value);
1771            };
1772
1773            if !(&key.contains('.')) {
1774                // If config does not belong to any namespace, assume it is
1775                // a format option and apply the format prefix for backwards
1776                // compatibility.
1777                let renamed_key = format!("format.{key}");
1778                options_map.insert(renamed_key.to_lowercase(), value_string);
1779            } else {
1780                options_map.insert(key.to_lowercase(), value_string);
1781            }
1782        }
1783
1784        Ok(options_map)
1785    }
1786
1787    /// Generate a plan for EXPLAIN ... that will print out a plan
1788    ///
1789    /// Note this is the sqlparser explain statement, not the
1790    /// datafusion `EXPLAIN` statement.
1791    fn explain_to_plan(
1792        &self,
1793        verbose: bool,
1794        analyze: bool,
1795        format: Option<String>,
1796        statement: DFStatement,
1797    ) -> Result<LogicalPlan> {
1798        let plan = self.statement_to_plan(statement)?;
1799        if matches!(plan, LogicalPlan::Explain(_)) {
1800            return plan_err!("Nested EXPLAINs are not supported");
1801        }
1802
1803        let plan = Arc::new(plan);
1804        let schema = LogicalPlan::explain_schema();
1805        let schema = schema.to_dfschema_ref()?;
1806
1807        if verbose && format.is_some() {
1808            return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1809        }
1810
1811        if analyze {
1812            if format.is_some() {
1813                return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1814            }
1815            Ok(LogicalPlan::Analyze(Analyze {
1816                verbose,
1817                input: plan,
1818                schema,
1819            }))
1820        } else {
1821            let stringified_plans =
1822                vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1823
1824            // default to configuration value
1825            // verbose mode only supports indent format
1826            let options = self.context_provider.options();
1827            let format = if verbose {
1828                ExplainFormat::Indent
1829            } else if let Some(format) = format {
1830                ExplainFormat::from_str(&format)?
1831            } else {
1832                options.explain.format.clone()
1833            };
1834
1835            Ok(LogicalPlan::Explain(Explain {
1836                verbose,
1837                explain_format: format,
1838                plan,
1839                stringified_plans,
1840                schema,
1841                logical_optimization_succeeded: false,
1842            }))
1843        }
1844    }
1845
1846    fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1847        if !self.has_table("information_schema", "df_settings") {
1848            return plan_err!(
1849                "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1850            );
1851        }
1852
1853        let verbose = variable
1854            .last()
1855            .map(|s| ident_to_string(s) == "verbose")
1856            .unwrap_or(false);
1857        let mut variable_vec = variable.to_vec();
1858        let mut columns: String = "name, value".to_owned();
1859
1860        if verbose {
1861            columns = format!("{columns}, description");
1862            variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1863        }
1864
1865        let variable = object_name_to_string(&ObjectName::from(variable_vec));
1866        let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1867        let query = if variable == "all" {
1868            // Add an ORDER BY so the output comes out in a consistent order
1869            format!("{base_query} ORDER BY name")
1870        } else if variable == "timezone" || variable == "time.zone" {
1871            // we could introduce alias in OptionDefinition if this string matching thing grows
1872            format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1873        } else {
1874            // These values are what are used to make the information_schema table, so we just
1875            // check here, before actually planning or executing the query, if it would produce no
1876            // results, and error preemptively if it would (for a better UX)
1877            let is_valid_variable = self
1878                .context_provider
1879                .options()
1880                .entries()
1881                .iter()
1882                .any(|opt| opt.key == variable);
1883
1884            // Check if it's a runtime variable
1885            let is_runtime_variable = variable.starts_with("datafusion.runtime.");
1886
1887            if !is_valid_variable && !is_runtime_variable {
1888                return plan_err!(
1889                    "'{variable}' is not a variable which can be viewed with 'SHOW'"
1890                );
1891            }
1892
1893            format!("{base_query} WHERE name = '{variable}'")
1894        };
1895
1896        let mut rewrite = DFParser::parse_sql(&query)?;
1897        assert_eq!(rewrite.len(), 1);
1898
1899        self.statement_to_plan(rewrite.pop_front().unwrap())
1900    }
1901
1902    fn set_statement_to_plan(&self, statement: Set) -> Result<LogicalPlan> {
1903        match statement {
1904            Set::SingleAssignment {
1905                scope,
1906                hivevar,
1907                variable,
1908                values,
1909            } => {
1910                if scope.is_some() {
1911                    return not_impl_err!("SET with scope modifiers is not supported");
1912                }
1913
1914                if hivevar {
1915                    return not_impl_err!("SET HIVEVAR is not supported");
1916                }
1917
1918                let variable = object_name_to_string(&variable);
1919                let mut variable_lower = variable.to_lowercase();
1920
1921                // Map PostgreSQL "timezone" and MySQL "time.zone" aliases to DataFusion's canonical name
1922                if variable_lower == "timezone" || variable_lower == "time.zone" {
1923                    variable_lower = "datafusion.execution.time_zone".to_string();
1924                }
1925
1926                if values.len() != 1 {
1927                    return plan_err!("SET only supports single value assignment");
1928                }
1929
1930                let value_string = match &values[0] {
1931                    SQLExpr::Identifier(i) => ident_to_string(i),
1932                    SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
1933                        None => {
1934                            return plan_err!("Unsupported value {:?}", v.value);
1935                        }
1936                        Some(s) => s,
1937                    },
1938                    SQLExpr::UnaryOp { op, expr } => match op {
1939                        UnaryOperator::Plus => format!("+{expr}"),
1940                        UnaryOperator::Minus => format!("-{expr}"),
1941                        _ => return plan_err!("Unsupported unary op {:?}", op),
1942                    },
1943                    _ => return plan_err!("Unsupported expr {:?}", values[0]),
1944                };
1945
1946                Ok(LogicalPlan::Statement(PlanStatement::SetVariable(
1947                    SetVariable {
1948                        variable: variable_lower,
1949                        value: value_string,
1950                    },
1951                )))
1952            }
1953            other => not_impl_err!("SET variant not implemented yet: {other:?}"),
1954        }
1955    }
1956
1957    fn reset_statement_to_plan(&self, statement: ResetStatement) -> Result<LogicalPlan> {
1958        match statement {
1959            ResetStatement::Variable(variable) => {
1960                let variable = object_name_to_string(&variable);
1961                let mut variable_lower = variable.to_lowercase();
1962
1963                // Map PostgreSQL "timezone" and MySQL "time.zone" aliases to DataFusion's canonical name
1964                if variable_lower == "timezone" || variable_lower == "time.zone" {
1965                    variable_lower = "datafusion.execution.time_zone".to_string();
1966                }
1967
1968                Ok(LogicalPlan::Statement(PlanStatement::ResetVariable(
1969                    ResetVariable {
1970                        variable: variable_lower,
1971                    },
1972                )))
1973            }
1974        }
1975    }
1976
1977    fn delete_to_plan(
1978        &self,
1979        table_name: &ObjectName,
1980        predicate_expr: Option<SQLExpr>,
1981    ) -> Result<LogicalPlan> {
1982        // Do a table lookup to verify the table exists
1983        let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1984        let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1985        let schema = DFSchema::try_from_qualified_schema(
1986            table_ref.clone(),
1987            &table_source.schema(),
1988        )?;
1989        let scan =
1990            LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1991                .build()?;
1992        let mut planner_context = PlannerContext::new();
1993
1994        let source = match predicate_expr {
1995            None => scan,
1996            Some(predicate_expr) => {
1997                let filter_expr =
1998                    self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1999                let schema = Arc::new(schema);
2000                let mut using_columns = HashSet::new();
2001                expr_to_columns(&filter_expr, &mut using_columns)?;
2002                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2003                    filter_expr,
2004                    &[&[&schema]],
2005                    &[using_columns],
2006                )?;
2007                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2008            }
2009        };
2010
2011        let plan = LogicalPlan::Dml(DmlStatement::new(
2012            table_ref,
2013            table_source,
2014            WriteOp::Delete,
2015            Arc::new(source),
2016        ));
2017        Ok(plan)
2018    }
2019
2020    fn update_to_plan(
2021        &self,
2022        table: TableWithJoins,
2023        assignments: &[Assignment],
2024        from: Option<TableWithJoins>,
2025        predicate_expr: Option<SQLExpr>,
2026    ) -> Result<LogicalPlan> {
2027        let (table_name, table_alias) = match &table.relation {
2028            TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
2029            _ => plan_err!("Cannot update non-table relation!")?,
2030        };
2031
2032        // Do a table lookup to verify the table exists
2033        let table_name = self.object_name_to_table_reference(table_name)?;
2034        let table_source = self.context_provider.get_table_source(table_name.clone())?;
2035        let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
2036            table_name.clone(),
2037            &table_source.schema(),
2038        )?);
2039
2040        // Overwrite with assignment expressions
2041        let mut planner_context = PlannerContext::new();
2042        let mut assign_map = assignments
2043            .iter()
2044            .map(|assign| {
2045                let cols = match &assign.target {
2046                    AssignmentTarget::ColumnName(cols) => cols,
2047                    _ => plan_err!("Tuples are not supported")?,
2048                };
2049                let col_name: &Ident = cols
2050                    .0
2051                    .iter()
2052                    .last()
2053                    .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
2054                    .as_ident()
2055                    .unwrap();
2056                // Validate that the assignment target column exists
2057                table_schema.field_with_unqualified_name(&col_name.value)?;
2058                Ok((col_name.value.clone(), assign.value.clone()))
2059            })
2060            .collect::<Result<HashMap<String, SQLExpr>>>()?;
2061
2062        // Build scan, join with from table if it exists.
2063        let mut input_tables = vec![table];
2064        input_tables.extend(from);
2065        let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
2066
2067        // Filter
2068        let source = match predicate_expr {
2069            None => scan,
2070            Some(predicate_expr) => {
2071                let filter_expr = self.sql_to_expr(
2072                    predicate_expr,
2073                    scan.schema(),
2074                    &mut planner_context,
2075                )?;
2076                let mut using_columns = HashSet::new();
2077                expr_to_columns(&filter_expr, &mut using_columns)?;
2078                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2079                    filter_expr,
2080                    &[&[scan.schema()]],
2081                    &[using_columns],
2082                )?;
2083                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2084            }
2085        };
2086
2087        // Build updated values for each column, using the previous value if not modified
2088        let exprs = table_schema
2089            .iter()
2090            .map(|(qualifier, field)| {
2091                let expr = match assign_map.remove(field.name()) {
2092                    Some(new_value) => {
2093                        let mut expr = self.sql_to_expr(
2094                            new_value,
2095                            source.schema(),
2096                            &mut planner_context,
2097                        )?;
2098                        // Update placeholder's datatype to the type of the target column
2099                        if let Expr::Placeholder(placeholder) = &mut expr {
2100                            placeholder.field = placeholder
2101                                .field
2102                                .take()
2103                                .or_else(|| Some(Arc::clone(field)));
2104                        }
2105                        // Cast to target column type, if necessary
2106                        expr.cast_to(field.data_type(), source.schema())?
2107                    }
2108                    None => {
2109                        // If the target table has an alias, use it to qualify the column name
2110                        if let Some(alias) = &table_alias {
2111                            Expr::Column(Column::new(
2112                                Some(self.ident_normalizer.normalize(alias.name.clone())),
2113                                field.name(),
2114                            ))
2115                        } else {
2116                            Expr::Column(Column::from((qualifier, field)))
2117                        }
2118                    }
2119                };
2120                Ok(expr.alias(field.name()))
2121            })
2122            .collect::<Result<Vec<_>>>()?;
2123
2124        let source = project(source, exprs)?;
2125
2126        let plan = LogicalPlan::Dml(DmlStatement::new(
2127            table_name,
2128            table_source,
2129            WriteOp::Update,
2130            Arc::new(source),
2131        ));
2132        Ok(plan)
2133    }
2134
2135    fn insert_to_plan(
2136        &self,
2137        table_name: ObjectName,
2138        columns: Vec<Ident>,
2139        source: Box<Query>,
2140        overwrite: bool,
2141        replace_into: bool,
2142    ) -> Result<LogicalPlan> {
2143        // Do a table lookup to verify the table exists
2144        let table_name = self.object_name_to_table_reference(table_name)?;
2145        let table_source = self.context_provider.get_table_source(table_name.clone())?;
2146        let table_schema = DFSchema::try_from(table_source.schema())?;
2147
2148        // Get insert fields and target table's value indices
2149        //
2150        // If value_indices[i] = Some(j), it means that the value of the i-th target table's column is
2151        // derived from the j-th output of the source.
2152        //
2153        // If value_indices[i] = None, it means that the value of the i-th target table's column is
2154        // not provided, and should be filled with a default value later.
2155        let (fields, value_indices) = if columns.is_empty() {
2156            // Empty means we're inserting into all columns of the table
2157            (
2158                table_schema.fields().clone(),
2159                (0..table_schema.fields().len())
2160                    .map(Some)
2161                    .collect::<Vec<_>>(),
2162            )
2163        } else {
2164            let mut value_indices = vec![None; table_schema.fields().len()];
2165            let fields = columns
2166                .into_iter()
2167                .enumerate()
2168                .map(|(i, c)| {
2169                    let c = self.ident_normalizer.normalize(c);
2170                    let column_index = table_schema
2171                        .index_of_column_by_name(None, &c)
2172                        .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
2173
2174                    if value_indices[column_index].is_some() {
2175                        return schema_err!(SchemaError::DuplicateUnqualifiedField {
2176                            name: c,
2177                        });
2178                    } else {
2179                        value_indices[column_index] = Some(i);
2180                    }
2181                    Ok(Arc::clone(table_schema.field(column_index)))
2182                })
2183                .collect::<Result<Vec<_>>>()?;
2184            (Fields::from(fields), value_indices)
2185        };
2186
2187        // infer types for Values clause... other types should be resolvable the regular way
2188        let mut prepare_param_data_types = BTreeMap::new();
2189        if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2190            for row in rows.iter() {
2191                for (idx, val) in row.iter().enumerate() {
2192                    if let SQLExpr::Value(ValueWithSpan {
2193                        value: Value::Placeholder(name),
2194                        span: _,
2195                    }) = val
2196                    {
2197                        let name =
2198                            name.replace('$', "").parse::<usize>().map_err(|_| {
2199                                plan_datafusion_err!("Can't parse placeholder: {name}")
2200                            })? - 1;
2201                        let field = fields.get(idx).ok_or_else(|| {
2202                            plan_datafusion_err!(
2203                                "Placeholder ${} refers to a non existent column",
2204                                idx + 1
2205                            )
2206                        })?;
2207                        let _ = prepare_param_data_types.insert(name, Arc::clone(field));
2208                    }
2209                }
2210            }
2211        }
2212        let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2213
2214        // Projection
2215        let mut planner_context =
2216            PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2217        planner_context.set_table_schema(Some(DFSchemaRef::new(
2218            DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2219        )));
2220        let source = self.query_to_plan(*source, &mut planner_context)?;
2221        if fields.len() != source.schema().fields().len() {
2222            plan_err!("Column count doesn't match insert query!")?;
2223        }
2224
2225        let exprs = value_indices
2226            .into_iter()
2227            .enumerate()
2228            .map(|(i, value_index)| {
2229                let target_field = table_schema.field(i);
2230                let expr = match value_index {
2231                    Some(v) => {
2232                        Expr::Column(Column::from(source.schema().qualified_field(v)))
2233                            .cast_to(target_field.data_type(), source.schema())?
2234                    }
2235                    // The value is not specified. Fill in the default value for the column.
2236                    None => table_source
2237                        .get_column_default(target_field.name())
2238                        .cloned()
2239                        .unwrap_or_else(|| {
2240                            // If there is no default for the column, then the default is NULL
2241                            Expr::Literal(ScalarValue::Null, None)
2242                        })
2243                        .cast_to(target_field.data_type(), &DFSchema::empty())?,
2244                };
2245                Ok(expr.alias(target_field.name()))
2246            })
2247            .collect::<Result<Vec<Expr>>>()?;
2248        let source = project(source, exprs)?;
2249
2250        let insert_op = match (overwrite, replace_into) {
2251            (false, false) => InsertOp::Append,
2252            (true, false) => InsertOp::Overwrite,
2253            (false, true) => InsertOp::Replace,
2254            (true, true) => plan_err!(
2255                "Conflicting insert operations: `overwrite` and `replace_into` cannot both be true"
2256            )?,
2257        };
2258
2259        let plan = LogicalPlan::Dml(DmlStatement::new(
2260            table_name,
2261            Arc::clone(&table_source),
2262            WriteOp::Insert(insert_op),
2263            Arc::new(source),
2264        ));
2265        Ok(plan)
2266    }
2267
2268    fn show_columns_to_plan(
2269        &self,
2270        extended: bool,
2271        full: bool,
2272        sql_table_name: ObjectName,
2273    ) -> Result<LogicalPlan> {
2274        // Figure out the where clause
2275        let where_clause = object_name_to_qualifier(
2276            &sql_table_name,
2277            self.options.enable_ident_normalization,
2278        )?;
2279
2280        if !self.has_table("information_schema", "columns") {
2281            return plan_err!(
2282                "SHOW COLUMNS is not supported unless information_schema is enabled"
2283            );
2284        }
2285
2286        // Do a table lookup to verify the table exists
2287        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2288        let _ = self.context_provider.get_table_source(table_ref)?;
2289
2290        // Treat both FULL and EXTENDED as the same
2291        let select_list = if full || extended {
2292            "*"
2293        } else {
2294            "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2295        };
2296
2297        let query = format!(
2298            "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2299        );
2300
2301        let mut rewrite = DFParser::parse_sql(&query)?;
2302        assert_eq!(rewrite.len(), 1);
2303        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2304    }
2305
2306    /// Rewrite `SHOW FUNCTIONS` to another SQL query
2307    /// The query is based on the `information_schema.routines` and `information_schema.parameters` tables
2308    ///
2309    /// The output columns:
2310    /// - function_name: The name of function
2311    /// - return_type: The return type of the function
2312    /// - parameters: The name of parameters (ordered by the ordinal position)
2313    /// - parameter_types: The type of parameters (ordered by the ordinal position)
2314    /// - description: The description of the function (the description defined in the document)
2315    /// - syntax_example: The syntax_example of the function (the syntax_example defined in the document)
2316    fn show_functions_to_plan(
2317        &self,
2318        filter: Option<ShowStatementFilter>,
2319    ) -> Result<LogicalPlan> {
2320        let where_clause = if let Some(filter) = filter {
2321            match filter {
2322                ShowStatementFilter::Like(like) => {
2323                    format!("WHERE p.function_name like '{like}'")
2324                }
2325                _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2326            }
2327        } else {
2328            "".to_string()
2329        };
2330
2331        let query = format!(
2332            r#"
2333SELECT DISTINCT
2334    p.*,
2335    r.function_type function_type,
2336    r.description description,
2337    r.syntax_example syntax_example
2338FROM
2339    (
2340        SELECT
2341            i.specific_name function_name,
2342            o.data_type return_type,
2343            array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2344            array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2345        FROM (
2346                 SELECT
2347                     specific_catalog,
2348                     specific_schema,
2349                     specific_name,
2350                     ordinal_position,
2351                     parameter_name,
2352                     data_type,
2353                     rid
2354                 FROM
2355                     information_schema.parameters
2356                 WHERE
2357                     parameter_mode = 'IN'
2358             ) i
2359                 JOIN
2360             (
2361                 SELECT
2362                     specific_catalog,
2363                     specific_schema,
2364                     specific_name,
2365                     ordinal_position,
2366                     parameter_name,
2367                     data_type,
2368                     rid
2369                 FROM
2370                     information_schema.parameters
2371                 WHERE
2372                     parameter_mode = 'OUT'
2373             ) o
2374             ON i.specific_catalog = o.specific_catalog
2375                 AND i.specific_schema = o.specific_schema
2376                 AND i.specific_name = o.specific_name
2377                 AND i.rid = o.rid
2378        GROUP BY 1, 2, i.rid
2379    ) as p
2380JOIN information_schema.routines r
2381ON p.function_name = r.routine_name
2382{where_clause}
2383            "#
2384        );
2385        let mut rewrite = DFParser::parse_sql(&query)?;
2386        assert_eq!(rewrite.len(), 1);
2387        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2388    }
2389
2390    fn show_create_table_to_plan(
2391        &self,
2392        sql_table_name: ObjectName,
2393    ) -> Result<LogicalPlan> {
2394        if !self.has_table("information_schema", "tables") {
2395            return plan_err!(
2396                "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2397            );
2398        }
2399        // Figure out the where clause
2400        let where_clause = object_name_to_qualifier(
2401            &sql_table_name,
2402            self.options.enable_ident_normalization,
2403        )?;
2404
2405        // Do a table lookup to verify the table exists
2406        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2407        let _ = self.context_provider.get_table_source(table_ref)?;
2408
2409        let query = format!(
2410            "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2411        );
2412
2413        let mut rewrite = DFParser::parse_sql(&query)?;
2414        assert_eq!(rewrite.len(), 1);
2415        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2416    }
2417
2418    /// Return true if there is a table provider available for "schema.table"
2419    fn has_table(&self, schema: &str, table: &str) -> bool {
2420        let tables_reference = TableReference::Partial {
2421            schema: schema.into(),
2422            table: table.into(),
2423        };
2424        self.context_provider
2425            .get_table_source(tables_reference)
2426            .is_ok()
2427    }
2428
2429    fn validate_transaction_kind(
2430        &self,
2431        kind: Option<&BeginTransactionKind>,
2432    ) -> Result<()> {
2433        match kind {
2434            // BEGIN
2435            None => Ok(()),
2436            // BEGIN TRANSACTION
2437            Some(BeginTransactionKind::Transaction) => Ok(()),
2438            Some(BeginTransactionKind::Work) => {
2439                not_impl_err!("Transaction kind not supported: {kind:?}")
2440            }
2441        }
2442    }
2443}