Skip to main content

datafusion_sql/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24    CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25    LexOrdering, ResetStatement, Statement as DFStatement,
26};
27use crate::planner::{
28    ContextProvider, PlannerContext, SqlToRel, object_name_to_qualifier,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{Field, FieldRef, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36    Column, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result,
37    ScalarValue, SchemaError, SchemaReference, TableReference, ToDFSchema, exec_err,
38    internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
39    unqualified_field_not_found,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::DdlStatement;
44use datafusion_expr::logical_plan::builder::project;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47    Analyze, CreateCatalog, CreateCatalogSchema,
48    CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49    CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50    DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51    EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52    LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare,
53    ResetVariable, SetVariable, SortExpr, Statement as PlanStatement, ToStringifiedPlan,
54    TransactionAccessMode, TransactionConclusion, TransactionEnd,
55    TransactionIsolationLevel, TransactionStart, Volatility, WriteOp, cast, col,
56};
57use sqlparser::ast::{
58    self, BeginTransactionKind, CheckConstraint, ForeignKeyConstraint, IndexColumn,
59    IndexType, NullsDistinctOption, OrderByExpr, OrderByOptions, PrimaryKeyConstraint,
60    Set, ShowStatementIn, ShowStatementOptions, SqliteOnConflict, TableObject,
61    UniqueConstraint, Update, UpdateTableFromKind, ValueWithSpan,
62};
63use sqlparser::ast::{
64    Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
65    CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
66    ObjectName, ObjectType, Query, SchemaName, SetExpr, ShowCreateObject,
67    ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
68    TransactionMode, UnaryOperator, Value,
69};
70use sqlparser::parser::ParserError::ParserError;
71
72fn ident_to_string(ident: &Ident) -> String {
73    normalize_ident(ident.to_owned())
74}
75
76fn object_name_to_string(object_name: &ObjectName) -> String {
77    object_name
78        .0
79        .iter()
80        .map(|object_name_part| {
81            object_name_part
82                .as_ident()
83                // TODO: It might be better to return an error
84                // than to silently use a default value.
85                .map_or_else(String::new, ident_to_string)
86        })
87        .collect::<Vec<String>>()
88        .join(".")
89}
90
91fn get_schema_name(schema_name: &SchemaName) -> String {
92    match schema_name {
93        SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
94        SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
95        SchemaName::NamedAuthorization(schema_name, auth) => format!(
96            "{}.{}",
97            object_name_to_string(schema_name),
98            ident_to_string(auth)
99        ),
100    }
101}
102
103/// Construct `TableConstraint`(s) for the given columns by iterating over
104/// `columns` and extracting individual inline constraint definitions.
105fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
106    let mut constraints: Vec<TableConstraint> = vec![];
107    for column in columns {
108        for ast::ColumnOptionDef { name, option } in &column.options {
109            match option {
110                ast::ColumnOption::Unique(UniqueConstraint {
111                    characteristics,
112                    name,
113                    index_name: _index_name,
114                    index_type_display: _index_type_display,
115                    index_type: _index_type,
116                    columns: _column,
117                    index_options: _index_options,
118                    nulls_distinct: _nulls_distinct,
119                }) => constraints.push(TableConstraint::Unique(UniqueConstraint {
120                    name: name.clone(),
121                    index_name: None,
122                    index_type_display: ast::KeyOrIndexDisplay::None,
123                    index_type: None,
124                    columns: vec![IndexColumn {
125                        column: OrderByExpr {
126                            expr: SQLExpr::Identifier(column.name.clone()),
127                            options: OrderByOptions {
128                                asc: None,
129                                nulls_first: None,
130                            },
131                            with_fill: None,
132                        },
133                        operator_class: None,
134                    }],
135                    index_options: vec![],
136                    characteristics: *characteristics,
137                    nulls_distinct: NullsDistinctOption::None,
138                })),
139                ast::ColumnOption::PrimaryKey(PrimaryKeyConstraint {
140                    characteristics,
141                    name: _name,
142                    index_name: _index_name,
143                    index_type: _index_type,
144                    columns: _columns,
145                    index_options: _index_options,
146                }) => {
147                    constraints.push(TableConstraint::PrimaryKey(PrimaryKeyConstraint {
148                        name: name.clone(),
149                        index_name: None,
150                        index_type: None,
151                        columns: vec![IndexColumn {
152                            column: OrderByExpr {
153                                expr: SQLExpr::Identifier(column.name.clone()),
154                                options: OrderByOptions {
155                                    asc: None,
156                                    nulls_first: None,
157                                },
158                                with_fill: None,
159                            },
160                            operator_class: None,
161                        }],
162                        index_options: vec![],
163                        characteristics: *characteristics,
164                    }))
165                }
166                ast::ColumnOption::ForeignKey(ForeignKeyConstraint {
167                    foreign_table,
168                    referred_columns,
169                    on_delete,
170                    on_update,
171                    characteristics,
172                    name: _name,
173                    index_name: _index_name,
174                    columns: _columns,
175                    match_kind: _match_kind,
176                }) => {
177                    constraints.push(TableConstraint::ForeignKey(ForeignKeyConstraint {
178                        name: name.clone(),
179                        index_name: None,
180                        columns: vec![],
181                        foreign_table: foreign_table.clone(),
182                        referred_columns: referred_columns.clone(),
183                        on_delete: *on_delete,
184                        on_update: *on_update,
185                        match_kind: None,
186                        characteristics: *characteristics,
187                    }))
188                }
189                ast::ColumnOption::Check(CheckConstraint {
190                    name,
191                    expr,
192                    enforced: _enforced,
193                }) => constraints.push(TableConstraint::Check(CheckConstraint {
194                    name: name.clone(),
195                    expr: expr.clone(),
196                    enforced: None,
197                })),
198                ast::ColumnOption::Default(_)
199                | ast::ColumnOption::Null
200                | ast::ColumnOption::NotNull
201                | ast::ColumnOption::DialectSpecific(_)
202                | ast::ColumnOption::CharacterSet(_)
203                | ast::ColumnOption::Generated { .. }
204                | ast::ColumnOption::Comment(_)
205                | ast::ColumnOption::Options(_)
206                | ast::ColumnOption::OnUpdate(_)
207                | ast::ColumnOption::Materialized(_)
208                | ast::ColumnOption::Ephemeral(_)
209                | ast::ColumnOption::Identity(_)
210                | ast::ColumnOption::OnConflict(_)
211                | ast::ColumnOption::Policy(_)
212                | ast::ColumnOption::Tags(_)
213                | ast::ColumnOption::Alias(_)
214                | ast::ColumnOption::Srid(_)
215                | ast::ColumnOption::Collation(_)
216                | ast::ColumnOption::Invisible => {}
217            }
218        }
219    }
220    constraints
221}
222
223impl<S: ContextProvider> SqlToRel<'_, S> {
224    /// Generate a logical plan from an DataFusion SQL statement
225    pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
226        match statement {
227            DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
228            DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
229            DFStatement::CopyTo(s) => self.copy_to_plan(s),
230            DFStatement::Explain(ExplainStatement {
231                verbose,
232                analyze,
233                format,
234                statement,
235            }) => self.explain_to_plan(verbose, analyze, format, *statement),
236            DFStatement::Reset(statement) => self.reset_statement_to_plan(statement),
237        }
238    }
239
240    /// Generate a logical plan from an SQL statement
241    pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
242        self.sql_statement_to_plan_with_context_impl(
243            statement,
244            &mut PlannerContext::new(),
245        )
246    }
247
248    /// Generate a logical plan from an SQL statement
249    pub fn sql_statement_to_plan_with_context(
250        &self,
251        statement: Statement,
252        planner_context: &mut PlannerContext,
253    ) -> Result<LogicalPlan> {
254        self.sql_statement_to_plan_with_context_impl(statement, planner_context)
255    }
256
257    fn sql_statement_to_plan_with_context_impl(
258        &self,
259        statement: Statement,
260        planner_context: &mut PlannerContext,
261    ) -> Result<LogicalPlan> {
262        match statement {
263            Statement::ExplainTable {
264                describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, // only parse 'DESCRIBE table_name' or 'DESC table_name' and not 'EXPLAIN table_name'
265                table_name,
266                ..
267            } => self.describe_table_to_plan(table_name),
268            Statement::Explain {
269                describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, // only parse 'DESCRIBE statement' or 'DESC statement' and not 'EXPLAIN statement'
270                statement,
271                ..
272            } => match *statement {
273                Statement::Query(query) => self.describe_query_to_plan(*query),
274                _ => {
275                    not_impl_err!("Describing statements other than SELECT not supported")
276                }
277            },
278            Statement::Explain {
279                verbose,
280                statement,
281                analyze,
282                format,
283                describe_alias: _,
284                ..
285            } => {
286                let format = format.map(|format| format.to_string());
287                let statement = DFStatement::Statement(statement);
288                self.explain_to_plan(verbose, analyze, format, statement)
289            }
290            Statement::Query(query) => self.query_to_plan(*query, planner_context),
291            Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
292            Statement::Set(statement) => self.set_statement_to_plan(statement),
293            Statement::CreateTable(CreateTable {
294                temporary,
295                external,
296                global,
297                transient,
298                volatile,
299                hive_distribution,
300                hive_formats,
301                file_format,
302                location,
303                query,
304                name,
305                columns,
306                constraints,
307                if_not_exists,
308                or_replace,
309                without_rowid,
310                like,
311                clone,
312                comment,
313                on_commit,
314                on_cluster,
315                primary_key,
316                order_by,
317                partition_by,
318                cluster_by,
319                clustered_by,
320                strict,
321                copy_grants,
322                enable_schema_evolution,
323                change_tracking,
324                data_retention_time_in_days,
325                max_data_extension_time_in_days,
326                default_ddl_collation,
327                with_aggregation_policy,
328                with_row_access_policy,
329                with_tags,
330                iceberg,
331                external_volume,
332                base_location,
333                catalog,
334                catalog_sync,
335                storage_serialization_policy,
336                inherits,
337                table_options: CreateTableOptions::None,
338                dynamic,
339                version,
340                target_lag,
341                warehouse,
342                refresh_mode,
343                initialize,
344                require_user,
345                partition_of,
346                for_values,
347            }) => {
348                if temporary {
349                    return not_impl_err!("Temporary tables not supported");
350                }
351                if external {
352                    return not_impl_err!("External tables not supported");
353                }
354                if global.is_some() {
355                    return not_impl_err!("Global tables not supported");
356                }
357                if transient {
358                    return not_impl_err!("Transient tables not supported");
359                }
360                if volatile {
361                    return not_impl_err!("Volatile tables not supported");
362                }
363                if hive_distribution != ast::HiveDistributionStyle::NONE {
364                    return not_impl_err!(
365                        "Hive distribution not supported: {hive_distribution:?}"
366                    );
367                }
368                if hive_formats.is_some()
369                    && !matches!(
370                        hive_formats,
371                        Some(ast::HiveFormat {
372                            row_format: None,
373                            serde_properties: None,
374                            storage: None,
375                            location: None,
376                        })
377                    )
378                {
379                    return not_impl_err!("Hive formats not supported: {hive_formats:?}");
380                }
381                if file_format.is_some() {
382                    return not_impl_err!("File format not supported");
383                }
384                if location.is_some() {
385                    return not_impl_err!("Location not supported");
386                }
387                if without_rowid {
388                    return not_impl_err!("Without rowid not supported");
389                }
390                if like.is_some() {
391                    return not_impl_err!("Like not supported");
392                }
393                if clone.is_some() {
394                    return not_impl_err!("Clone not supported");
395                }
396                if comment.is_some() {
397                    return not_impl_err!("Comment not supported");
398                }
399                if on_commit.is_some() {
400                    return not_impl_err!("On commit not supported");
401                }
402                if on_cluster.is_some() {
403                    return not_impl_err!("On cluster not supported");
404                }
405                if primary_key.is_some() {
406                    return not_impl_err!("Primary key not supported");
407                }
408                if order_by.is_some() {
409                    return not_impl_err!("Order by not supported");
410                }
411                if partition_by.is_some() {
412                    return not_impl_err!("Partition by not supported");
413                }
414                if cluster_by.is_some() {
415                    return not_impl_err!("Cluster by not supported");
416                }
417                if clustered_by.is_some() {
418                    return not_impl_err!("Clustered by not supported");
419                }
420                if strict {
421                    return not_impl_err!("Strict not supported");
422                }
423                if copy_grants {
424                    return not_impl_err!("Copy grants not supported");
425                }
426                if enable_schema_evolution.is_some() {
427                    return not_impl_err!("Enable schema evolution not supported");
428                }
429                if change_tracking.is_some() {
430                    return not_impl_err!("Change tracking not supported");
431                }
432                if data_retention_time_in_days.is_some() {
433                    return not_impl_err!("Data retention time in days not supported");
434                }
435                if max_data_extension_time_in_days.is_some() {
436                    return not_impl_err!(
437                        "Max data extension time in days not supported"
438                    );
439                }
440                if default_ddl_collation.is_some() {
441                    return not_impl_err!("Default DDL collation not supported");
442                }
443                if with_aggregation_policy.is_some() {
444                    return not_impl_err!("With aggregation policy not supported");
445                }
446                if with_row_access_policy.is_some() {
447                    return not_impl_err!("With row access policy not supported");
448                }
449                if with_tags.is_some() {
450                    return not_impl_err!("With tags not supported");
451                }
452                if iceberg {
453                    return not_impl_err!("Iceberg not supported");
454                }
455                if external_volume.is_some() {
456                    return not_impl_err!("External volume not supported");
457                }
458                if base_location.is_some() {
459                    return not_impl_err!("Base location not supported");
460                }
461                if catalog.is_some() {
462                    return not_impl_err!("Catalog not supported");
463                }
464                if catalog_sync.is_some() {
465                    return not_impl_err!("Catalog sync not supported");
466                }
467                if storage_serialization_policy.is_some() {
468                    return not_impl_err!("Storage serialization policy not supported");
469                }
470                if inherits.is_some() {
471                    return not_impl_err!("Table inheritance not supported");
472                }
473                if dynamic {
474                    return not_impl_err!("Dynamic tables not supported");
475                }
476                if version.is_some() {
477                    return not_impl_err!("Version not supported");
478                }
479                if target_lag.is_some() {
480                    return not_impl_err!("Target lag not supported");
481                }
482                if warehouse.is_some() {
483                    return not_impl_err!("Warehouse not supported");
484                }
485                if refresh_mode.is_some() {
486                    return not_impl_err!("Refresh mode not supported");
487                }
488                if initialize.is_some() {
489                    return not_impl_err!("Initialize not supported");
490                }
491                if require_user {
492                    return not_impl_err!("Require user not supported");
493                }
494                if partition_of.is_some() {
495                    return not_impl_err!("PARTITION OF not supported");
496                }
497                if for_values.is_some() {
498                    return not_impl_err!("PARTITION OF .. FOR VALUES .. not supported");
499                }
500                // Merge inline constraints and existing constraints
501                let mut all_constraints = constraints;
502                let inline_constraints = calc_inline_constraints_from_columns(&columns);
503                all_constraints.extend(inline_constraints);
504                // Build column default values
505                let column_defaults =
506                    self.build_column_defaults(&columns, planner_context)?;
507
508                let has_columns = !columns.is_empty();
509                let schema = self.build_schema(columns)?.to_dfschema_ref()?;
510                if has_columns {
511                    planner_context.set_table_schema(Some(Arc::clone(&schema)));
512                }
513
514                match query {
515                    Some(query) => {
516                        let plan = self.query_to_plan(*query, planner_context)?;
517                        let input_schema = plan.schema();
518
519                        let plan = if has_columns {
520                            if schema.fields().len() != input_schema.fields().len() {
521                                return plan_err!(
522                                    "Mismatch: {} columns specified, but result has {} columns",
523                                    schema.fields().len(),
524                                    input_schema.fields().len()
525                                );
526                            }
527                            let input_fields = input_schema.fields();
528                            let project_exprs = schema
529                                .fields()
530                                .iter()
531                                .zip(input_fields)
532                                .map(|(field, input_field)| {
533                                    cast(
534                                        col(input_field.name()),
535                                        field.data_type().clone(),
536                                    )
537                                    .alias(field.name())
538                                })
539                                .collect::<Vec<_>>();
540
541                            LogicalPlanBuilder::from(plan.clone())
542                                .project(project_exprs)?
543                                .build()?
544                        } else {
545                            plan
546                        };
547
548                        let constraints = self.new_constraint_from_table_constraints(
549                            &all_constraints,
550                            plan.schema(),
551                        )?;
552
553                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
554                            CreateMemoryTable {
555                                name: self.object_name_to_table_reference(name)?,
556                                constraints,
557                                input: Arc::new(plan),
558                                if_not_exists,
559                                or_replace,
560                                column_defaults,
561                                temporary,
562                            },
563                        )))
564                    }
565
566                    None => {
567                        let plan = EmptyRelation {
568                            produce_one_row: false,
569                            schema,
570                        };
571                        let plan = LogicalPlan::EmptyRelation(plan);
572                        let constraints = self.new_constraint_from_table_constraints(
573                            &all_constraints,
574                            plan.schema(),
575                        )?;
576                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
577                            CreateMemoryTable {
578                                name: self.object_name_to_table_reference(name)?,
579                                constraints,
580                                input: Arc::new(plan),
581                                if_not_exists,
582                                or_replace,
583                                column_defaults,
584                                temporary,
585                            },
586                        )))
587                    }
588                }
589            }
590            Statement::CreateView(ast::CreateView {
591                or_replace,
592                materialized,
593                name,
594                columns,
595                query,
596                options: CreateTableOptions::None,
597                cluster_by,
598                comment,
599                with_no_schema_binding,
600                if_not_exists,
601                temporary,
602                to,
603                params,
604                or_alter,
605                secure,
606                name_before_not_exists,
607            }) => {
608                if materialized {
609                    return not_impl_err!("Materialized views not supported")?;
610                }
611                if !cluster_by.is_empty() {
612                    return not_impl_err!("Cluster by not supported")?;
613                }
614                if comment.is_some() {
615                    return not_impl_err!("Comment not supported")?;
616                }
617                if with_no_schema_binding {
618                    return not_impl_err!("With no schema binding not supported")?;
619                }
620                if if_not_exists {
621                    return not_impl_err!("If not exists not supported")?;
622                }
623                if to.is_some() {
624                    return not_impl_err!("To not supported")?;
625                }
626
627                // put the statement back together temporarily to get the SQL
628                // string representation
629                let stmt = Statement::CreateView(ast::CreateView {
630                    or_replace,
631                    materialized,
632                    name,
633                    columns,
634                    query,
635                    options: CreateTableOptions::None,
636                    cluster_by,
637                    comment,
638                    with_no_schema_binding,
639                    if_not_exists,
640                    temporary,
641                    to,
642                    params,
643                    or_alter,
644                    secure,
645                    name_before_not_exists,
646                });
647                let sql = stmt.to_string();
648                let Statement::CreateView(ast::CreateView {
649                    name,
650                    columns,
651                    query,
652                    or_replace,
653                    temporary,
654                    ..
655                }) = stmt
656                else {
657                    return internal_err!("Unreachable code in create view");
658                };
659
660                let columns = columns
661                    .into_iter()
662                    .map(|view_column_def| {
663                        if let Some(options) = view_column_def.options {
664                            plan_err!(
665                                "Options not supported for view columns: {options:?}"
666                            )
667                        } else {
668                            Ok(view_column_def.name)
669                        }
670                    })
671                    .collect::<Result<Vec<_>>>()?;
672
673                let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
674                plan = self.apply_expr_alias(plan, columns)?;
675
676                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
677                    name: self.object_name_to_table_reference(name)?,
678                    input: Arc::new(plan),
679                    or_replace,
680                    definition: Some(sql),
681                    temporary,
682                })))
683            }
684            Statement::ShowCreate { obj_type, obj_name } => match obj_type {
685                ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
686                _ => {
687                    not_impl_err!("Only `SHOW CREATE TABLE  ...` statement is supported")
688                }
689            },
690            Statement::CreateSchema {
691                schema_name,
692                if_not_exists,
693                ..
694            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
695                CreateCatalogSchema {
696                    schema_name: get_schema_name(&schema_name),
697                    if_not_exists,
698                    schema: Arc::new(DFSchema::empty()),
699                },
700            ))),
701            Statement::CreateDatabase {
702                db_name,
703                if_not_exists,
704                ..
705            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
706                CreateCatalog {
707                    catalog_name: object_name_to_string(&db_name),
708                    if_not_exists,
709                    schema: Arc::new(DFSchema::empty()),
710                },
711            ))),
712            Statement::Drop {
713                object_type,
714                if_exists,
715                mut names,
716                cascade,
717                restrict: _,
718                purge: _,
719                temporary: _,
720                table: _,
721            } => {
722                // We don't support cascade and purge for now.
723                // nor do we support multiple object names
724                let name = match names.len() {
725                    0 => Err(ParserError("Missing table name.".to_string()).into()),
726                    1 => self.object_name_to_table_reference(names.pop().unwrap()),
727                    _ => {
728                        Err(ParserError("Multiple objects not supported".to_string())
729                            .into())
730                    }
731                }?;
732
733                match object_type {
734                    ObjectType::Table => {
735                        Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
736                            name,
737                            if_exists,
738                            schema: DFSchemaRef::new(DFSchema::empty()),
739                        })))
740                    }
741                    ObjectType::View => {
742                        Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
743                            name,
744                            if_exists,
745                            schema: DFSchemaRef::new(DFSchema::empty()),
746                        })))
747                    }
748                    ObjectType::Schema => {
749                        let name = match name {
750                            TableReference::Bare { table } => {
751                                Ok(SchemaReference::Bare { schema: table })
752                            }
753                            TableReference::Partial { schema, table } => {
754                                Ok(SchemaReference::Full {
755                                    schema: table,
756                                    catalog: schema,
757                                })
758                            }
759                            TableReference::Full {
760                                catalog: _,
761                                schema: _,
762                                table: _,
763                            } => Err(ParserError(
764                                "Invalid schema specifier (has 3 parts)".to_string(),
765                            )),
766                        }?;
767                        Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(
768                            DropCatalogSchema {
769                                name,
770                                if_exists,
771                                cascade,
772                                schema: DFSchemaRef::new(DFSchema::empty()),
773                            },
774                        )))
775                    }
776                    _ => not_impl_err!(
777                        "Only `DROP TABLE/VIEW/SCHEMA  ...` statement is supported currently"
778                    ),
779                }
780            }
781            Statement::Prepare {
782                name,
783                data_types,
784                statement,
785            } => {
786                // Convert parser data types to DataFusion data types
787                let mut fields: Vec<FieldRef> = data_types
788                    .into_iter()
789                    .map(|t| self.convert_data_type_to_field(&t))
790                    .collect::<Result<_>>()?;
791
792                // Create planner context with parameters
793                let mut planner_context =
794                    PlannerContext::new().with_prepare_param_data_types(fields.clone());
795
796                // Build logical plan for inner statement of the prepare statement
797                let plan = self.sql_statement_to_plan_with_context_impl(
798                    *statement,
799                    &mut planner_context,
800                )?;
801
802                if fields.is_empty() {
803                    let map_types = plan.get_parameter_fields()?;
804                    let param_types: Vec<_> = (1..=map_types.len())
805                        .filter_map(|i| {
806                            let key = format!("${i}");
807                            map_types.get(&key).and_then(|opt| opt.clone())
808                        })
809                        .collect();
810                    fields.extend(param_types.iter().cloned());
811                    planner_context.with_prepare_param_data_types(param_types);
812                }
813
814                Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
815                    name: ident_to_string(&name),
816                    fields,
817                    input: Arc::new(plan),
818                })))
819            }
820            Statement::Execute {
821                name,
822                parameters,
823                using,
824                // has_parentheses specifies the syntax, but the plan is the
825                // same no matter the syntax used, so ignore it
826                has_parentheses: _,
827                immediate,
828                into,
829                output,
830                default,
831            } => {
832                // `USING` is a MySQL-specific syntax and currently not supported.
833                if !using.is_empty() {
834                    return not_impl_err!(
835                        "Execute statement with USING is not supported"
836                    );
837                }
838                if immediate {
839                    return not_impl_err!(
840                        "Execute statement with IMMEDIATE is not supported"
841                    );
842                }
843                if !into.is_empty() {
844                    return not_impl_err!("Execute statement with INTO is not supported");
845                }
846                if output {
847                    return not_impl_err!(
848                        "Execute statement with OUTPUT is not supported"
849                    );
850                }
851                if default {
852                    return not_impl_err!(
853                        "Execute statement with DEFAULT is not supported"
854                    );
855                }
856                let empty_schema = DFSchema::empty();
857                let parameters = parameters
858                    .into_iter()
859                    .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
860                    .collect::<Result<Vec<Expr>>>()?;
861
862                Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
863                    name: object_name_to_string(&name.unwrap()),
864                    parameters,
865                })))
866            }
867            Statement::Deallocate {
868                name,
869                // Similar to PostgreSQL, the PREPARE keyword is ignored
870                prepare: _,
871            } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
872                Deallocate {
873                    name: ident_to_string(&name),
874                },
875            ))),
876
877            Statement::ShowTables {
878                extended,
879                full,
880                terse,
881                history,
882                external,
883                show_options,
884            } => {
885                // We only support the basic "SHOW TABLES"
886                // https://github.com/apache/datafusion/issues/3188
887                if extended {
888                    return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
889                }
890                if full {
891                    return not_impl_err!("SHOW FULL TABLES not supported")?;
892                }
893                if terse {
894                    return not_impl_err!("SHOW TERSE TABLES not supported")?;
895                }
896                if history {
897                    return not_impl_err!("SHOW TABLES HISTORY not supported")?;
898                }
899                if external {
900                    return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
901                }
902                let ShowStatementOptions {
903                    show_in,
904                    starts_with,
905                    limit,
906                    limit_from,
907                    filter_position,
908                } = show_options;
909                if show_in.is_some() {
910                    return not_impl_err!("SHOW TABLES IN not supported")?;
911                }
912                if starts_with.is_some() {
913                    return not_impl_err!("SHOW TABLES LIKE not supported")?;
914                }
915                if limit.is_some() {
916                    return not_impl_err!("SHOW TABLES LIMIT not supported")?;
917                }
918                if limit_from.is_some() {
919                    return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
920                }
921                if filter_position.is_some() {
922                    return not_impl_err!("SHOW TABLES FILTER not supported")?;
923                }
924                self.show_tables_to_plan()
925            }
926
927            Statement::ShowColumns {
928                extended,
929                full,
930                show_options,
931            } => {
932                let ShowStatementOptions {
933                    show_in,
934                    starts_with,
935                    limit,
936                    limit_from,
937                    filter_position,
938                } = show_options;
939                if starts_with.is_some() {
940                    return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
941                }
942                if limit.is_some() {
943                    return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
944                }
945                if limit_from.is_some() {
946                    return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
947                }
948                if filter_position.is_some() {
949                    return not_impl_err!(
950                        "SHOW COLUMNS with WHERE or LIKE is not supported"
951                    )?;
952                }
953                let Some(ShowStatementIn {
954                    // specifies if the syntax was `SHOW COLUMNS IN` or `SHOW
955                    // COLUMNS FROM` which is not different in DataFusion
956                    clause: _,
957                    parent_type,
958                    parent_name,
959                }) = show_in
960                else {
961                    return plan_err!("SHOW COLUMNS requires a table name");
962                };
963
964                if let Some(parent_type) = parent_type {
965                    return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
966                }
967                let Some(table_name) = parent_name else {
968                    return plan_err!("SHOW COLUMNS requires a table name");
969                };
970
971                self.show_columns_to_plan(extended, full, table_name)
972            }
973
974            Statement::ShowFunctions { filter, .. } => {
975                self.show_functions_to_plan(filter)
976            }
977
978            Statement::Insert(Insert {
979                or,
980                into,
981                columns,
982                overwrite,
983                source,
984                partitioned,
985                after_columns,
986                table,
987                on,
988                returning,
989                ignore,
990                table_alias,
991                mut replace_into,
992                priority,
993                insert_alias,
994                assignments,
995                has_table_keyword,
996                settings,
997                format_clause,
998                insert_token: _, // record the location the `INSERT` token
999                optimizer_hint,
1000            }) => {
1001                let table_name = match table {
1002                    TableObject::TableName(table_name) => table_name,
1003                    TableObject::TableFunction(_) => {
1004                        return not_impl_err!(
1005                            "INSERT INTO Table functions not supported"
1006                        );
1007                    }
1008                };
1009                if let Some(or) = or {
1010                    match or {
1011                        SqliteOnConflict::Replace => replace_into = true,
1012                        _ => plan_err!("Inserts with {or} clause is not supported")?,
1013                    }
1014                }
1015                if partitioned.is_some() {
1016                    plan_err!("Partitioned inserts not yet supported")?;
1017                }
1018                if !after_columns.is_empty() {
1019                    plan_err!("After-columns clause not supported")?;
1020                }
1021                if on.is_some() {
1022                    plan_err!("Insert-on clause not supported")?;
1023                }
1024                if returning.is_some() {
1025                    plan_err!("Insert-returning clause not supported")?;
1026                }
1027                if ignore {
1028                    plan_err!("Insert-ignore clause not supported")?;
1029                }
1030                let Some(source) = source else {
1031                    plan_err!("Inserts without a source not supported")?
1032                };
1033                if let Some(table_alias) = table_alias {
1034                    plan_err!(
1035                        "Inserts with a table alias not supported: {table_alias:?}"
1036                    )?
1037                };
1038                if let Some(priority) = priority {
1039                    plan_err!(
1040                        "Inserts with a `PRIORITY` clause not supported: {priority:?}"
1041                    )?
1042                };
1043                if insert_alias.is_some() {
1044                    plan_err!("Inserts with an alias not supported")?;
1045                }
1046                if !assignments.is_empty() {
1047                    plan_err!("Inserts with assignments not supported")?;
1048                }
1049                if settings.is_some() {
1050                    plan_err!("Inserts with settings not supported")?;
1051                }
1052                if format_clause.is_some() {
1053                    plan_err!("Inserts with format clause not supported")?;
1054                }
1055                if optimizer_hint.is_some() {
1056                    plan_err!("Optimizer hints not supported")?;
1057                }
1058                // optional keywords don't change behavior
1059                let _ = into;
1060                let _ = has_table_keyword;
1061                self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
1062            }
1063            Statement::Update(Update {
1064                table,
1065                assignments,
1066                from,
1067                selection,
1068                returning,
1069                or,
1070                limit,
1071                update_token: _,
1072                optimizer_hint,
1073            }) => {
1074                let from_clauses =
1075                    from.map(|update_table_from_kind| match update_table_from_kind {
1076                        UpdateTableFromKind::BeforeSet(from_clauses) => from_clauses,
1077                        UpdateTableFromKind::AfterSet(from_clauses) => from_clauses,
1078                    });
1079                // TODO: support multiple tables in UPDATE SET FROM
1080                if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
1081                    not_impl_err!(
1082                        "Multiple tables in UPDATE SET FROM not yet supported"
1083                    )?;
1084                }
1085                let update_from = from_clauses.and_then(|mut f| f.pop());
1086
1087                // UPDATE ... FROM is currently not working
1088                // TODO fix https://github.com/apache/datafusion/issues/19950
1089                if update_from.is_some() {
1090                    return not_impl_err!("UPDATE ... FROM is not supported");
1091                }
1092
1093                if returning.is_some() {
1094                    plan_err!("Update-returning clause not yet supported")?;
1095                }
1096                if or.is_some() {
1097                    plan_err!("ON conflict not supported")?;
1098                }
1099                if limit.is_some() {
1100                    return not_impl_err!("Update-limit clause not supported")?;
1101                }
1102                if optimizer_hint.is_some() {
1103                    plan_err!("Optimizer hints not supported")?;
1104                }
1105                self.update_to_plan(table, &assignments, update_from, selection)
1106            }
1107
1108            Statement::Delete(Delete {
1109                tables,
1110                using,
1111                selection,
1112                returning,
1113                from,
1114                order_by,
1115                limit,
1116                delete_token: _,
1117                optimizer_hint,
1118            }) => {
1119                if !tables.is_empty() {
1120                    plan_err!("DELETE <TABLE> not supported")?;
1121                }
1122
1123                if using.is_some() {
1124                    plan_err!("Using clause not supported")?;
1125                }
1126
1127                if returning.is_some() {
1128                    plan_err!("Delete-returning clause not yet supported")?;
1129                }
1130
1131                if !order_by.is_empty() {
1132                    plan_err!("Delete-order-by clause not yet supported")?;
1133                }
1134
1135                if optimizer_hint.is_some() {
1136                    plan_err!("Optimizer hints not supported")?;
1137                }
1138
1139                let table_name = self.get_delete_target(from)?;
1140                self.delete_to_plan(&table_name, selection, limit)
1141            }
1142
1143            Statement::StartTransaction {
1144                modes,
1145                begin: false,
1146                modifier,
1147                transaction,
1148                statements,
1149                has_end_keyword,
1150                exception,
1151            } => {
1152                if let Some(modifier) = modifier {
1153                    return not_impl_err!(
1154                        "Transaction modifier not supported: {modifier}"
1155                    );
1156                }
1157                if !statements.is_empty() {
1158                    return not_impl_err!(
1159                        "Transaction with multiple statements not supported"
1160                    );
1161                }
1162                if exception.is_some() {
1163                    return not_impl_err!(
1164                        "Transaction with exception statements not supported"
1165                    );
1166                }
1167                if has_end_keyword {
1168                    return not_impl_err!("Transaction with END keyword not supported");
1169                }
1170                self.validate_transaction_kind(transaction.as_ref())?;
1171                let isolation_level: ast::TransactionIsolationLevel = modes
1172                    .iter()
1173                    .filter_map(|m: &TransactionMode| match m {
1174                        TransactionMode::AccessMode(_) => None,
1175                        TransactionMode::IsolationLevel(level) => Some(level),
1176                    })
1177                    .next_back()
1178                    .copied()
1179                    .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1180                let access_mode: ast::TransactionAccessMode = modes
1181                    .iter()
1182                    .filter_map(|m: &TransactionMode| match m {
1183                        TransactionMode::AccessMode(mode) => Some(mode),
1184                        TransactionMode::IsolationLevel(_) => None,
1185                    })
1186                    .next_back()
1187                    .copied()
1188                    .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1189                let isolation_level = match isolation_level {
1190                    ast::TransactionIsolationLevel::ReadUncommitted => {
1191                        TransactionIsolationLevel::ReadUncommitted
1192                    }
1193                    ast::TransactionIsolationLevel::ReadCommitted => {
1194                        TransactionIsolationLevel::ReadCommitted
1195                    }
1196                    ast::TransactionIsolationLevel::RepeatableRead => {
1197                        TransactionIsolationLevel::RepeatableRead
1198                    }
1199                    ast::TransactionIsolationLevel::Serializable => {
1200                        TransactionIsolationLevel::Serializable
1201                    }
1202                    ast::TransactionIsolationLevel::Snapshot => {
1203                        TransactionIsolationLevel::Snapshot
1204                    }
1205                };
1206                let access_mode = match access_mode {
1207                    ast::TransactionAccessMode::ReadOnly => {
1208                        TransactionAccessMode::ReadOnly
1209                    }
1210                    ast::TransactionAccessMode::ReadWrite => {
1211                        TransactionAccessMode::ReadWrite
1212                    }
1213                };
1214                let statement = PlanStatement::TransactionStart(TransactionStart {
1215                    access_mode,
1216                    isolation_level,
1217                });
1218                Ok(LogicalPlan::Statement(statement))
1219            }
1220            Statement::Commit {
1221                chain,
1222                end,
1223                modifier,
1224            } => {
1225                if end {
1226                    return not_impl_err!("COMMIT AND END not supported");
1227                };
1228                if let Some(modifier) = modifier {
1229                    return not_impl_err!("COMMIT {modifier} not supported");
1230                };
1231                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1232                    conclusion: TransactionConclusion::Commit,
1233                    chain,
1234                });
1235                Ok(LogicalPlan::Statement(statement))
1236            }
1237            Statement::Rollback { chain, savepoint } => {
1238                if savepoint.is_some() {
1239                    plan_err!("Savepoints not supported")?;
1240                }
1241                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1242                    conclusion: TransactionConclusion::Rollback,
1243                    chain,
1244                });
1245                Ok(LogicalPlan::Statement(statement))
1246            }
1247            Statement::CreateFunction(ast::CreateFunction {
1248                or_replace,
1249                temporary,
1250                name,
1251                args,
1252                return_type,
1253                function_body,
1254                behavior,
1255                language,
1256                ..
1257            }) => {
1258                let return_type = match return_type {
1259                    Some(t) => Some(self.convert_data_type_to_field(&t)?),
1260                    None => None,
1261                };
1262                let mut planner_context = PlannerContext::new();
1263                let empty_schema = &DFSchema::empty();
1264
1265                let args = match args {
1266                    Some(function_args) => {
1267                        let function_args = function_args
1268                            .into_iter()
1269                            .map(|arg| {
1270                                let data_type =
1271                                    self.convert_data_type_to_field(&arg.data_type)?;
1272
1273                                let default_expr = match arg.default_expr {
1274                                    Some(expr) => Some(self.sql_to_expr(
1275                                        expr,
1276                                        empty_schema,
1277                                        &mut planner_context,
1278                                    )?),
1279                                    None => None,
1280                                };
1281                                Ok(OperateFunctionArg {
1282                                    name: arg.name,
1283                                    default_expr,
1284                                    data_type: data_type.data_type().clone(),
1285                                })
1286                            })
1287                            .collect::<Result<Vec<OperateFunctionArg>>>();
1288                        Some(function_args?)
1289                    }
1290                    None => None,
1291                };
1292                // Validate default arguments
1293                let first_default = match args.as_ref() {
1294                    Some(arg) => arg.iter().position(|t| t.default_expr.is_some()),
1295                    None => None,
1296                };
1297                let last_non_default = match args.as_ref() {
1298                    Some(arg) => arg
1299                        .iter()
1300                        .rev()
1301                        .position(|t| t.default_expr.is_none())
1302                        .map(|reverse_pos| arg.len() - reverse_pos - 1),
1303                    None => None,
1304                };
1305                if let (Some(pos_default), Some(pos_non_default)) =
1306                    (first_default, last_non_default)
1307                    && pos_non_default > pos_default
1308                {
1309                    return plan_err!(
1310                        "Non-default arguments cannot follow default arguments."
1311                    );
1312                }
1313                // At the moment functions can't be qualified `schema.name`
1314                let name = match &name.0[..] {
1315                    [] => exec_err!("Function should have name")?,
1316                    [n] => n.as_ident().unwrap().value.clone(),
1317                    [..] => not_impl_err!("Qualified functions are not supported")?,
1318                };
1319                //
1320                // Convert resulting expression to data fusion expression
1321                //
1322                let arg_types = args.as_ref().map(|arg| {
1323                    arg.iter()
1324                        .map(|t| {
1325                            let name = match t.name.clone() {
1326                                Some(name) => name.value,
1327                                None => "".to_string(),
1328                            };
1329                            Arc::new(Field::new(name, t.data_type.clone(), true))
1330                        })
1331                        .collect::<Vec<_>>()
1332                });
1333                // Validate parameter style
1334                if let Some(ref fields) = arg_types {
1335                    let count_positional =
1336                        fields.iter().filter(|f| f.name() == "").count();
1337                    if !(count_positional == 0 || count_positional == fields.len()) {
1338                        return plan_err!(
1339                            "All function arguments must use either named or positional style."
1340                        );
1341                    }
1342                }
1343                let mut planner_context = PlannerContext::new()
1344                    .with_prepare_param_data_types(arg_types.unwrap_or_default());
1345
1346                let function_body = match function_body {
1347                    Some(r) => Some(self.sql_to_expr(
1348                        match r {
1349                            // `link_symbol` indicates if the primary expression contains the name of shared library file.
1350                            ast::CreateFunctionBody::AsBeforeOptions{body: expr, link_symbol: _link_symbol} => expr,
1351                            ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1352                            ast::CreateFunctionBody::Return(expr) => expr,
1353                            ast::CreateFunctionBody::AsBeginEnd(_) => {
1354                                return not_impl_err!(
1355                                    "BEGIN/END enclosed function body syntax is not supported"
1356                                )?;
1357                            }
1358                            ast::CreateFunctionBody::AsReturnExpr(_)
1359                            | ast::CreateFunctionBody::AsReturnSelect(_) => {
1360                                return not_impl_err!(
1361                                    "AS RETURN function syntax is not supported"
1362                                )?
1363                            }
1364                        },
1365                        &DFSchema::empty(),
1366                        &mut planner_context,
1367                    )?),
1368                    None => None,
1369                };
1370
1371                let params = CreateFunctionBody {
1372                    language,
1373                    behavior: behavior.map(|b| match b {
1374                        ast::FunctionBehavior::Immutable => Volatility::Immutable,
1375                        ast::FunctionBehavior::Stable => Volatility::Stable,
1376                        ast::FunctionBehavior::Volatile => Volatility::Volatile,
1377                    }),
1378                    function_body,
1379                };
1380
1381                let statement = DdlStatement::CreateFunction(CreateFunction {
1382                    or_replace,
1383                    temporary,
1384                    name,
1385                    return_type: return_type.map(|f| f.data_type().clone()),
1386                    args,
1387                    params,
1388                    schema: DFSchemaRef::new(DFSchema::empty()),
1389                });
1390
1391                Ok(LogicalPlan::Ddl(statement))
1392            }
1393            Statement::DropFunction(ast::DropFunction {
1394                if_exists,
1395                func_desc,
1396                drop_behavior: _,
1397            }) => {
1398                // According to postgresql documentation it can be only one function
1399                // specified in drop statement
1400                if let Some(desc) = func_desc.first() {
1401                    // At the moment functions can't be qualified `schema.name`
1402                    let name = match &desc.name.0[..] {
1403                        [] => exec_err!("Function should have name")?,
1404                        [n] => n.as_ident().unwrap().value.clone(),
1405                        [..] => not_impl_err!("Qualified functions are not supported")?,
1406                    };
1407                    let statement = DdlStatement::DropFunction(DropFunction {
1408                        if_exists,
1409                        name,
1410                        schema: DFSchemaRef::new(DFSchema::empty()),
1411                    });
1412                    Ok(LogicalPlan::Ddl(statement))
1413                } else {
1414                    exec_err!("Function name not provided")
1415                }
1416            }
1417            Statement::Truncate(ast::Truncate {
1418                table_names,
1419                partitions,
1420                identity,
1421                cascade,
1422                on_cluster,
1423                table,
1424                if_exists,
1425            }) => {
1426                let _ = table; // Support TRUNCATE TABLE and TRUNCATE syntax
1427                if table_names.len() != 1 {
1428                    return not_impl_err!(
1429                        "TRUNCATE with multiple tables is not supported"
1430                    );
1431                }
1432
1433                let target = &table_names[0];
1434                if target.only {
1435                    return not_impl_err!("TRUNCATE with ONLY is not supported");
1436                }
1437                if partitions.is_some() {
1438                    return not_impl_err!("TRUNCATE with PARTITION is not supported");
1439                }
1440                if identity.is_some() {
1441                    return not_impl_err!(
1442                        "TRUNCATE with RESTART/CONTINUE IDENTITY is not supported"
1443                    );
1444                }
1445                if cascade.is_some() {
1446                    return not_impl_err!(
1447                        "TRUNCATE with CASCADE/RESTRICT is not supported"
1448                    );
1449                }
1450                if on_cluster.is_some() {
1451                    return not_impl_err!("TRUNCATE with ON CLUSTER is not supported");
1452                }
1453                if if_exists {
1454                    return not_impl_err!("TRUNCATE .. with IF EXISTS is not supported");
1455                }
1456                let table = self.object_name_to_table_reference(target.name.clone())?;
1457                let source = self.context_provider.get_table_source(table.clone())?;
1458
1459                // TRUNCATE does not operate on input rows. The EmptyRelation is a logical placeholder
1460                // since the real operation is executed directly by the TableProvider's truncate() hook.
1461                Ok(LogicalPlan::Dml(DmlStatement::new(
1462                    table.clone(),
1463                    source,
1464                    WriteOp::Truncate,
1465                    Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
1466                        produce_one_row: false,
1467                        schema: DFSchemaRef::new(DFSchema::empty()),
1468                    })),
1469                )))
1470            }
1471            Statement::CreateIndex(CreateIndex {
1472                name,
1473                table_name,
1474                using,
1475                columns,
1476                unique,
1477                if_not_exists,
1478                ..
1479            }) => {
1480                let name: Option<String> = name.as_ref().map(object_name_to_string);
1481                let table = self.object_name_to_table_reference(table_name)?;
1482                let table_schema = self
1483                    .context_provider
1484                    .get_table_source(table.clone())?
1485                    .schema()
1486                    .to_dfschema_ref()?;
1487                let using: Option<String> =
1488                    using.as_ref().map(|index_type| match index_type {
1489                        IndexType::Custom(ident) => ident_to_string(ident),
1490                        _ => index_type.to_string().to_ascii_lowercase(),
1491                    });
1492                let order_by_exprs: Vec<OrderByExpr> =
1493                    columns.into_iter().map(|col| col.column).collect();
1494                let columns = self.order_by_to_sort_expr(
1495                    order_by_exprs,
1496                    &table_schema,
1497                    planner_context,
1498                    false,
1499                    None,
1500                )?;
1501                Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1502                    PlanCreateIndex {
1503                        name,
1504                        table,
1505                        using,
1506                        columns,
1507                        unique,
1508                        if_not_exists,
1509                        schema: DFSchemaRef::new(DFSchema::empty()),
1510                    },
1511                )))
1512            }
1513            stmt => {
1514                not_impl_err!("Unsupported SQL statement: {stmt}")
1515            }
1516        }
1517    }
1518
1519    fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1520        let mut from = match from {
1521            FromTable::WithFromKeyword(v) => v,
1522            FromTable::WithoutKeyword(v) => v,
1523        };
1524
1525        if from.len() != 1 {
1526            return not_impl_err!(
1527                "DELETE FROM only supports single table, got {}: {from:?}",
1528                from.len()
1529            );
1530        }
1531        let table_factor = from.pop().unwrap();
1532        if !table_factor.joins.is_empty() {
1533            return not_impl_err!("DELETE FROM only supports single table, got: joins");
1534        }
1535        let TableFactor::Table { name, .. } = table_factor.relation else {
1536            return not_impl_err!(
1537                "DELETE FROM only supports single table, got: {table_factor:?}"
1538            );
1539        };
1540
1541        Ok(name)
1542    }
1543
1544    /// Generate a logical plan from a "SHOW TABLES" query
1545    fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1546        if self.has_table("information_schema", "tables") {
1547            let query = "SELECT * FROM information_schema.tables;";
1548            let mut rewrite = DFParser::parse_sql(query)?;
1549            assert_eq!(rewrite.len(), 1);
1550            self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
1551        } else {
1552            plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1553        }
1554    }
1555
1556    fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1557        let table_ref = self.object_name_to_table_reference(table_name)?;
1558
1559        let table_source = self.context_provider.get_table_source(table_ref)?;
1560
1561        let schema = table_source.schema();
1562
1563        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1564
1565        Ok(LogicalPlan::DescribeTable(DescribeTable {
1566            schema,
1567            output_schema: Arc::new(output_schema),
1568        }))
1569    }
1570
1571    fn describe_query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
1572        let plan = self.query_to_plan(query, &mut PlannerContext::new())?;
1573
1574        let schema = Arc::new(plan.schema().as_arrow().clone());
1575
1576        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1577
1578        Ok(LogicalPlan::DescribeTable(DescribeTable {
1579            schema,
1580            output_schema: Arc::new(output_schema),
1581        }))
1582    }
1583
1584    fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1585        // Determine if source is table or query and handle accordingly
1586        let copy_source = statement.source;
1587        let (input, input_schema, table_ref) = match copy_source {
1588            CopyToSource::Relation(object_name) => {
1589                let table_name = object_name_to_string(&object_name);
1590                let table_ref = self.object_name_to_table_reference(object_name)?;
1591                let table_source =
1592                    self.context_provider.get_table_source(table_ref.clone())?;
1593                let plan =
1594                    LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1595                let input_schema = Arc::clone(plan.schema());
1596                (plan, input_schema, Some(table_ref))
1597            }
1598            CopyToSource::Query(query) => {
1599                let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1600                let input_schema = Arc::clone(plan.schema());
1601                (plan, input_schema, None)
1602            }
1603        };
1604
1605        let options_map = self.parse_options_map(statement.options, true)?;
1606
1607        let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1608            self.context_provider.get_file_type(stored_as).ok()
1609        } else {
1610            None
1611        };
1612
1613        let file_type = match maybe_file_type {
1614            Some(ft) => ft,
1615            None => {
1616                let e = || {
1617                    DataFusionError::Configuration(
1618                        "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1619                            .to_string(),
1620                    )
1621                };
1622                // Try to infer file format from file extension
1623                let extension: &str = &Path::new(&statement.target)
1624                    .extension()
1625                    .ok_or_else(e)?
1626                    .to_str()
1627                    .ok_or_else(e)?
1628                    .to_lowercase();
1629
1630                self.context_provider.get_file_type(extension)?
1631            }
1632        };
1633
1634        let partition_by = statement
1635            .partitioned_by
1636            .iter()
1637            .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1638            .collect::<Result<Vec<_>>>()?
1639            .into_iter()
1640            .map(|f| f.name().to_owned())
1641            .collect();
1642
1643        Ok(LogicalPlan::Copy(CopyTo::new(
1644            Arc::new(input),
1645            statement.target,
1646            partition_by,
1647            file_type,
1648            options_map,
1649        )))
1650    }
1651
1652    fn build_order_by(
1653        &self,
1654        order_exprs: Vec<LexOrdering>,
1655        schema: &DFSchemaRef,
1656        planner_context: &mut PlannerContext,
1657    ) -> Result<Vec<Vec<SortExpr>>> {
1658        if !order_exprs.is_empty() && schema.fields().is_empty() {
1659            let results = order_exprs
1660                .iter()
1661                .map(|lex_order| {
1662                    let result = lex_order
1663                        .iter()
1664                        .map(|order_by_expr| {
1665                            let ordered_expr = &order_by_expr.expr;
1666                            let ordered_expr = ordered_expr.to_owned();
1667                            let ordered_expr = self.sql_expr_to_logical_expr(
1668                                ordered_expr,
1669                                schema,
1670                                planner_context,
1671                            )?;
1672                            let asc = order_by_expr.options.asc.unwrap_or(true);
1673                            let nulls_first =
1674                                order_by_expr.options.nulls_first.unwrap_or_else(|| {
1675                                    self.options.default_null_ordering.nulls_first(asc)
1676                                });
1677
1678                            Ok(SortExpr::new(ordered_expr, asc, nulls_first))
1679                        })
1680                        .collect::<Result<Vec<SortExpr>>>()?;
1681                    Ok(result)
1682                })
1683                .collect::<Result<Vec<Vec<SortExpr>>>>()?;
1684
1685            return Ok(results);
1686        }
1687
1688        let mut all_results = vec![];
1689        for expr in order_exprs {
1690            // Convert each OrderByExpr to a SortExpr:
1691            let expr_vec =
1692                self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1693            // Verify that columns of all SortExprs exist in the schema:
1694            for sort in expr_vec.iter() {
1695                for column in sort.expr.column_refs().iter() {
1696                    if !schema.has_column(column) {
1697                        // Return an error if any column is not in the schema:
1698                        return plan_err!("Column {column} is not in schema");
1699                    }
1700                }
1701            }
1702            // If all SortExprs are valid, return them as an expression vector
1703            all_results.push(expr_vec)
1704        }
1705        Ok(all_results)
1706    }
1707
1708    /// Generate a logical plan from a CREATE EXTERNAL TABLE statement
1709    fn external_table_to_plan(
1710        &self,
1711        statement: CreateExternalTable,
1712    ) -> Result<LogicalPlan> {
1713        let definition = Some(statement.to_string());
1714        let CreateExternalTable {
1715            name,
1716            columns,
1717            file_type,
1718            location,
1719            table_partition_cols,
1720            if_not_exists,
1721            temporary,
1722            order_exprs,
1723            unbounded,
1724            options,
1725            constraints,
1726            or_replace,
1727        } = statement;
1728
1729        // Merge inline constraints and existing constraints
1730        let mut all_constraints = constraints;
1731        let inline_constraints = calc_inline_constraints_from_columns(&columns);
1732        all_constraints.extend(inline_constraints);
1733
1734        let options_map = self.parse_options_map(options, false)?;
1735
1736        let compression = options_map
1737            .get("format.compression")
1738            .map(|c| CompressionTypeVariant::from_str(c))
1739            .transpose()?;
1740        if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1741            && compression
1742                .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1743                .unwrap_or(false)
1744        {
1745            plan_err!(
1746                "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1747            )?;
1748        }
1749
1750        let mut planner_context = PlannerContext::new();
1751
1752        let column_defaults = self
1753            .build_column_defaults(&columns, &mut planner_context)?
1754            .into_iter()
1755            .collect();
1756
1757        let schema = self.build_schema(columns)?;
1758        let df_schema = schema.to_dfschema_ref()?;
1759        df_schema.check_names()?;
1760
1761        let ordered_exprs =
1762            self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1763
1764        let name = self.object_name_to_table_reference(name)?;
1765        let constraints =
1766            self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1767        Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1768            PlanCreateExternalTable::builder(name, location, file_type, df_schema)
1769                .with_partition_cols(table_partition_cols)
1770                .with_if_not_exists(if_not_exists)
1771                .with_or_replace(or_replace)
1772                .with_temporary(temporary)
1773                .with_definition(definition)
1774                .with_order_exprs(ordered_exprs)
1775                .with_unbounded(unbounded)
1776                .with_options(options_map)
1777                .with_constraints(constraints)
1778                .with_column_defaults(column_defaults)
1779                .build(),
1780        )))
1781    }
1782
1783    /// Get the indices of the constraint columns in the schema.
1784    /// If any column is not found, return an error.
1785    fn get_constraint_column_indices(
1786        &self,
1787        df_schema: &DFSchemaRef,
1788        columns: &[IndexColumn],
1789        constraint_name: &str,
1790    ) -> Result<Vec<usize>> {
1791        let field_names = df_schema.field_names();
1792        columns
1793            .iter()
1794            .map(|index_column| {
1795                let expr = &index_column.column.expr;
1796                let ident = if let SQLExpr::Identifier(ident) = expr {
1797                    ident
1798                } else {
1799                    return Err(plan_datafusion_err!(
1800                        "Column name for {constraint_name} must be an identifier: {expr}"
1801                    ));
1802                };
1803                let column = self.ident_normalizer.normalize(ident.clone());
1804                field_names
1805                    .iter()
1806                    .position(|item| *item == column)
1807                    .ok_or_else(|| {
1808                        plan_datafusion_err!(
1809                            "Column for {constraint_name} not found in schema: {column}"
1810                        )
1811                    })
1812            })
1813            .collect::<Result<Vec<_>>>()
1814    }
1815
1816    /// Convert each [TableConstraint] to corresponding [Constraint]
1817    pub fn new_constraint_from_table_constraints(
1818        &self,
1819        constraints: &[TableConstraint],
1820        df_schema: &DFSchemaRef,
1821    ) -> Result<Constraints> {
1822        let constraints = constraints
1823            .iter()
1824            .map(|c: &TableConstraint| match c {
1825                TableConstraint::Unique(UniqueConstraint {
1826                    name,
1827                    index_name: _,
1828                    index_type_display: _,
1829                    index_type: _,
1830                    columns,
1831                    index_options: _,
1832                    characteristics: _,
1833                    nulls_distinct: _,
1834                }) => {
1835                    let constraint_name = match &name {
1836                        Some(name) => &format!("unique constraint with name '{name}'"),
1837                        None => "unique constraint",
1838                    };
1839                    // Get unique constraint indices in the schema
1840                    let indices = self.get_constraint_column_indices(
1841                        df_schema,
1842                        columns,
1843                        constraint_name,
1844                    )?;
1845                    Ok(Constraint::Unique(indices))
1846                }
1847                TableConstraint::PrimaryKey(PrimaryKeyConstraint {
1848                    name: _,
1849                    index_name: _,
1850                    index_type: _,
1851                    columns,
1852                    index_options: _,
1853                    characteristics: _,
1854                }) => {
1855                    // Get primary key indices in the schema
1856                    let indices = self.get_constraint_column_indices(
1857                        df_schema,
1858                        columns,
1859                        "primary key",
1860                    )?;
1861                    Ok(Constraint::PrimaryKey(indices))
1862                }
1863                TableConstraint::ForeignKey { .. } => {
1864                    _plan_err!("Foreign key constraints are not currently supported")
1865                }
1866                TableConstraint::Check { .. } => {
1867                    _plan_err!("Check constraints are not currently supported")
1868                }
1869                TableConstraint::Index { .. } => {
1870                    _plan_err!("Indexes are not currently supported")
1871                }
1872                TableConstraint::FulltextOrSpatial { .. } => {
1873                    _plan_err!("Indexes are not currently supported")
1874                }
1875            })
1876            .collect::<Result<Vec<_>>>()?;
1877        Ok(Constraints::new_unverified(constraints))
1878    }
1879
1880    fn parse_options_map(
1881        &self,
1882        options: Vec<(String, Value)>,
1883        allow_duplicates: bool,
1884    ) -> Result<HashMap<String, String>> {
1885        let mut options_map = HashMap::new();
1886        for (key, value) in options {
1887            if !allow_duplicates && options_map.contains_key(&key) {
1888                return plan_err!("Option {key} is specified multiple times");
1889            }
1890
1891            let Some(value_string) = crate::utils::value_to_string(&value) else {
1892                return plan_err!("Unsupported Value {}", value);
1893            };
1894
1895            if !(&key.contains('.')) {
1896                // If config does not belong to any namespace, assume it is
1897                // a format option and apply the format prefix for backwards
1898                // compatibility.
1899                let renamed_key = format!("format.{key}");
1900                options_map.insert(renamed_key.to_lowercase(), value_string);
1901            } else {
1902                options_map.insert(key.to_lowercase(), value_string);
1903            }
1904        }
1905
1906        Ok(options_map)
1907    }
1908
1909    /// Generate a plan for EXPLAIN ... that will print out a plan
1910    ///
1911    /// Note this is the sqlparser explain statement, not the
1912    /// datafusion `EXPLAIN` statement.
1913    fn explain_to_plan(
1914        &self,
1915        verbose: bool,
1916        analyze: bool,
1917        format: Option<String>,
1918        statement: DFStatement,
1919    ) -> Result<LogicalPlan> {
1920        let plan = self.statement_to_plan(statement)?;
1921        if matches!(plan, LogicalPlan::Explain(_)) {
1922            return plan_err!("Nested EXPLAINs are not supported");
1923        }
1924
1925        let plan = Arc::new(plan);
1926        let schema = LogicalPlan::explain_schema();
1927        let schema = schema.to_dfschema_ref()?;
1928
1929        if verbose && format.is_some() {
1930            return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1931        }
1932
1933        if analyze {
1934            if format.is_some() {
1935                return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1936            }
1937            Ok(LogicalPlan::Analyze(Analyze {
1938                verbose,
1939                input: plan,
1940                schema,
1941            }))
1942        } else {
1943            let stringified_plans =
1944                vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1945
1946            // default to configuration value
1947            // verbose mode only supports indent format
1948            let options = self.context_provider.options();
1949            let format = if verbose {
1950                ExplainFormat::Indent
1951            } else if let Some(format) = format {
1952                ExplainFormat::from_str(&format)?
1953            } else {
1954                options.explain.format.clone()
1955            };
1956
1957            Ok(LogicalPlan::Explain(Explain {
1958                verbose,
1959                explain_format: format,
1960                plan,
1961                stringified_plans,
1962                schema,
1963                logical_optimization_succeeded: false,
1964            }))
1965        }
1966    }
1967
1968    fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1969        if !self.has_table("information_schema", "df_settings") {
1970            return plan_err!(
1971                "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1972            );
1973        }
1974
1975        let verbose = variable
1976            .last()
1977            .map(|s| ident_to_string(s) == "verbose")
1978            .unwrap_or(false);
1979        let mut variable_vec = variable.to_vec();
1980        let mut columns: String = "name, value".to_owned();
1981
1982        if verbose {
1983            columns = format!("{columns}, description");
1984            variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1985        }
1986
1987        let variable = object_name_to_string(&ObjectName::from(variable_vec));
1988        let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1989        let query = if variable == "all" {
1990            // Add an ORDER BY so the output comes out in a consistent order
1991            format!("{base_query} ORDER BY name")
1992        } else if variable == "timezone" || variable == "time.zone" {
1993            // we could introduce alias in OptionDefinition if this string matching thing grows
1994            format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1995        } else {
1996            // These values are what are used to make the information_schema table, so we just
1997            // check here, before actually planning or executing the query, if it would produce no
1998            // results, and error preemptively if it would (for a better UX)
1999            let is_valid_variable = self
2000                .context_provider
2001                .options()
2002                .entries()
2003                .iter()
2004                .any(|opt| opt.key == variable);
2005
2006            // Check if it's a runtime variable
2007            let is_runtime_variable = variable.starts_with("datafusion.runtime.");
2008
2009            if !is_valid_variable && !is_runtime_variable {
2010                return plan_err!(
2011                    "'{variable}' is not a variable which can be viewed with 'SHOW'"
2012                );
2013            }
2014
2015            format!("{base_query} WHERE name = '{variable}'")
2016        };
2017
2018        let mut rewrite = DFParser::parse_sql(&query)?;
2019        assert_eq!(rewrite.len(), 1);
2020
2021        self.statement_to_plan(rewrite.pop_front().unwrap())
2022    }
2023
2024    fn set_statement_to_plan(&self, statement: Set) -> Result<LogicalPlan> {
2025        match statement {
2026            Set::SingleAssignment {
2027                scope,
2028                hivevar,
2029                variable,
2030                values,
2031            } => {
2032                if scope.is_some() {
2033                    return not_impl_err!("SET with scope modifiers is not supported");
2034                }
2035
2036                if hivevar {
2037                    return not_impl_err!("SET HIVEVAR is not supported");
2038                }
2039
2040                let variable = object_name_to_string(&variable);
2041                let mut variable_lower = variable.to_lowercase();
2042
2043                // Map PostgreSQL "timezone" and MySQL "time.zone" aliases to DataFusion's canonical name
2044                if variable_lower == "timezone" || variable_lower == "time.zone" {
2045                    variable_lower = "datafusion.execution.time_zone".to_string();
2046                }
2047
2048                if values.len() != 1 {
2049                    return plan_err!("SET only supports single value assignment");
2050                }
2051
2052                let value_string = match &values[0] {
2053                    SQLExpr::Identifier(i) => ident_to_string(i),
2054                    SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
2055                        None => {
2056                            return plan_err!("Unsupported value {:?}", v.value);
2057                        }
2058                        Some(s) => s,
2059                    },
2060                    SQLExpr::UnaryOp { op, expr } => match op {
2061                        UnaryOperator::Plus => format!("+{expr}"),
2062                        UnaryOperator::Minus => format!("-{expr}"),
2063                        _ => return plan_err!("Unsupported unary op {:?}", op),
2064                    },
2065                    _ => return plan_err!("Unsupported expr {:?}", values[0]),
2066                };
2067
2068                Ok(LogicalPlan::Statement(PlanStatement::SetVariable(
2069                    SetVariable {
2070                        variable: variable_lower,
2071                        value: value_string,
2072                    },
2073                )))
2074            }
2075            other => not_impl_err!("SET variant not implemented yet: {other:?}"),
2076        }
2077    }
2078
2079    fn reset_statement_to_plan(&self, statement: ResetStatement) -> Result<LogicalPlan> {
2080        match statement {
2081            ResetStatement::Variable(variable) => {
2082                let variable = object_name_to_string(&variable);
2083                let mut variable_lower = variable.to_lowercase();
2084
2085                // Map PostgreSQL "timezone" and MySQL "time.zone" aliases to DataFusion's canonical name
2086                if variable_lower == "timezone" || variable_lower == "time.zone" {
2087                    variable_lower = "datafusion.execution.time_zone".to_string();
2088                }
2089
2090                Ok(LogicalPlan::Statement(PlanStatement::ResetVariable(
2091                    ResetVariable {
2092                        variable: variable_lower,
2093                    },
2094                )))
2095            }
2096        }
2097    }
2098
2099    fn delete_to_plan(
2100        &self,
2101        table_name: &ObjectName,
2102        predicate_expr: Option<SQLExpr>,
2103        limit: Option<SQLExpr>,
2104    ) -> Result<LogicalPlan> {
2105        // Do a table lookup to verify the table exists
2106        let table_ref = self.object_name_to_table_reference(table_name.clone())?;
2107        let table_source = self.context_provider.get_table_source(table_ref.clone())?;
2108        let schema = DFSchema::try_from_qualified_schema(
2109            table_ref.clone(),
2110            &table_source.schema(),
2111        )?;
2112        let scan =
2113            LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
2114                .build()?;
2115        let mut planner_context = PlannerContext::new();
2116
2117        let mut source = match predicate_expr {
2118            None => scan,
2119            Some(predicate_expr) => {
2120                let filter_expr =
2121                    self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
2122                let schema = Arc::new(schema);
2123                let mut using_columns = HashSet::new();
2124                expr_to_columns(&filter_expr, &mut using_columns)?;
2125                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2126                    filter_expr,
2127                    &[&[&schema]],
2128                    &[using_columns],
2129                )?;
2130                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2131            }
2132        };
2133
2134        if let Some(limit) = limit {
2135            let empty_schema = DFSchema::empty();
2136            let limit = self.sql_to_expr(limit, &empty_schema, &mut planner_context)?;
2137            source = LogicalPlanBuilder::from(source)
2138                .limit_by_expr(None, Some(limit))?
2139                .build()?
2140        }
2141
2142        let plan = LogicalPlan::Dml(DmlStatement::new(
2143            table_ref,
2144            table_source,
2145            WriteOp::Delete,
2146            Arc::new(source),
2147        ));
2148        Ok(plan)
2149    }
2150
2151    fn update_to_plan(
2152        &self,
2153        table: TableWithJoins,
2154        assignments: &[Assignment],
2155        from: Option<TableWithJoins>,
2156        predicate_expr: Option<SQLExpr>,
2157    ) -> Result<LogicalPlan> {
2158        let (table_name, table_alias) = match &table.relation {
2159            TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
2160            _ => plan_err!("Cannot update non-table relation!")?,
2161        };
2162
2163        // Do a table lookup to verify the table exists
2164        let table_name = self.object_name_to_table_reference(table_name)?;
2165        let table_source = self.context_provider.get_table_source(table_name.clone())?;
2166        let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
2167            table_name.clone(),
2168            &table_source.schema(),
2169        )?);
2170
2171        // Overwrite with assignment expressions
2172        let mut planner_context = PlannerContext::new();
2173        let mut assign_map = assignments
2174            .iter()
2175            .map(|assign| {
2176                let cols = match &assign.target {
2177                    AssignmentTarget::ColumnName(cols) => cols,
2178                    _ => plan_err!("Tuples are not supported")?,
2179                };
2180                let col_name: &Ident = cols
2181                    .0
2182                    .iter()
2183                    .last()
2184                    .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
2185                    .as_ident()
2186                    .unwrap();
2187                // Validate that the assignment target column exists
2188                table_schema.field_with_unqualified_name(&col_name.value)?;
2189                Ok((col_name.value.clone(), assign.value.clone()))
2190            })
2191            .collect::<Result<HashMap<String, SQLExpr>>>()?;
2192
2193        // Build scan, join with from table if it exists.
2194        let mut input_tables = vec![table];
2195        input_tables.extend(from);
2196        let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
2197
2198        // Filter
2199        let source = match predicate_expr {
2200            None => scan,
2201            Some(predicate_expr) => {
2202                let filter_expr = self.sql_to_expr(
2203                    predicate_expr,
2204                    scan.schema(),
2205                    &mut planner_context,
2206                )?;
2207                let mut using_columns = HashSet::new();
2208                expr_to_columns(&filter_expr, &mut using_columns)?;
2209                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2210                    filter_expr,
2211                    &[&[scan.schema()]],
2212                    &[using_columns],
2213                )?;
2214                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2215            }
2216        };
2217
2218        // Build updated values for each column, using the previous value if not modified
2219        let exprs = table_schema
2220            .iter()
2221            .map(|(qualifier, field)| {
2222                let expr = match assign_map.remove(field.name()) {
2223                    Some(new_value) => {
2224                        let mut expr = self.sql_to_expr(
2225                            new_value,
2226                            source.schema(),
2227                            &mut planner_context,
2228                        )?;
2229                        // Update placeholder's datatype to the type of the target column
2230                        if let Expr::Placeholder(placeholder) = &mut expr {
2231                            placeholder.field = placeholder
2232                                .field
2233                                .take()
2234                                .or_else(|| Some(Arc::clone(field)));
2235                        }
2236                        // Cast to target column type, if necessary
2237                        expr.cast_to(field.data_type(), source.schema())?
2238                    }
2239                    None => {
2240                        // If the target table has an alias, use it to qualify the column name
2241                        if let Some(alias) = &table_alias {
2242                            Expr::Column(Column::new(
2243                                Some(self.ident_normalizer.normalize(alias.name.clone())),
2244                                field.name(),
2245                            ))
2246                        } else {
2247                            Expr::Column(Column::from((qualifier, field)))
2248                        }
2249                    }
2250                };
2251                Ok(expr.alias(field.name()))
2252            })
2253            .collect::<Result<Vec<_>>>()?;
2254
2255        let source = project(source, exprs)?;
2256
2257        let plan = LogicalPlan::Dml(DmlStatement::new(
2258            table_name,
2259            table_source,
2260            WriteOp::Update,
2261            Arc::new(source),
2262        ));
2263        Ok(plan)
2264    }
2265
2266    fn insert_to_plan(
2267        &self,
2268        table_name: ObjectName,
2269        columns: Vec<Ident>,
2270        source: Box<Query>,
2271        overwrite: bool,
2272        replace_into: bool,
2273    ) -> Result<LogicalPlan> {
2274        // Do a table lookup to verify the table exists
2275        let table_name = self.object_name_to_table_reference(table_name)?;
2276        let table_source = self.context_provider.get_table_source(table_name.clone())?;
2277        let table_schema = DFSchema::try_from(table_source.schema())?;
2278
2279        // Get insert fields and target table's value indices
2280        //
2281        // If value_indices[i] = Some(j), it means that the value of the i-th target table's column is
2282        // derived from the j-th output of the source.
2283        //
2284        // If value_indices[i] = None, it means that the value of the i-th target table's column is
2285        // not provided, and should be filled with a default value later.
2286        let (fields, value_indices) = if columns.is_empty() {
2287            // Empty means we're inserting into all columns of the table
2288            (
2289                table_schema.fields().clone(),
2290                (0..table_schema.fields().len())
2291                    .map(Some)
2292                    .collect::<Vec<_>>(),
2293            )
2294        } else {
2295            let mut value_indices = vec![None; table_schema.fields().len()];
2296            let fields = columns
2297                .into_iter()
2298                .enumerate()
2299                .map(|(i, c)| {
2300                    let c = self.ident_normalizer.normalize(c);
2301                    let column_index = table_schema
2302                        .index_of_column_by_name(None, &c)
2303                        .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
2304
2305                    if value_indices[column_index].is_some() {
2306                        return schema_err!(SchemaError::DuplicateUnqualifiedField {
2307                            name: c,
2308                        });
2309                    } else {
2310                        value_indices[column_index] = Some(i);
2311                    }
2312                    Ok(Arc::clone(table_schema.field(column_index)))
2313                })
2314                .collect::<Result<Vec<_>>>()?;
2315            (Fields::from(fields), value_indices)
2316        };
2317
2318        // infer types for Values clause... other types should be resolvable the regular way
2319        let mut prepare_param_data_types = BTreeMap::new();
2320        if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2321            for row in rows.iter() {
2322                for (idx, val) in row.iter().enumerate() {
2323                    if let SQLExpr::Value(ValueWithSpan {
2324                        value: Value::Placeholder(name),
2325                        span: _,
2326                    }) = val
2327                    {
2328                        let name =
2329                            name.replace('$', "").parse::<usize>().map_err(|_| {
2330                                plan_datafusion_err!("Can't parse placeholder: {name}")
2331                            })? - 1;
2332                        let field = fields.get(idx).ok_or_else(|| {
2333                            plan_datafusion_err!(
2334                                "Placeholder ${} refers to a non existent column",
2335                                idx + 1
2336                            )
2337                        })?;
2338                        let _ = prepare_param_data_types.insert(name, Arc::clone(field));
2339                    }
2340                }
2341            }
2342        }
2343        let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2344
2345        // Projection
2346        let mut planner_context =
2347            PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2348        planner_context.set_table_schema(Some(DFSchemaRef::new(
2349            DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2350        )));
2351        let source = self.query_to_plan(*source, &mut planner_context)?;
2352        if fields.len() != source.schema().fields().len() {
2353            plan_err!("Column count doesn't match insert query!")?;
2354        }
2355
2356        let exprs = value_indices
2357            .into_iter()
2358            .enumerate()
2359            .map(|(i, value_index)| {
2360                let target_field = table_schema.field(i);
2361                let expr = match value_index {
2362                    Some(v) => {
2363                        Expr::Column(Column::from(source.schema().qualified_field(v)))
2364                            .cast_to(target_field.data_type(), source.schema())?
2365                    }
2366                    // The value is not specified. Fill in the default value for the column.
2367                    None => table_source
2368                        .get_column_default(target_field.name())
2369                        .cloned()
2370                        .unwrap_or_else(|| {
2371                            // If there is no default for the column, then the default is NULL
2372                            Expr::Literal(ScalarValue::Null, None)
2373                        })
2374                        .cast_to(target_field.data_type(), &DFSchema::empty())?,
2375                };
2376                Ok(expr.alias(target_field.name()))
2377            })
2378            .collect::<Result<Vec<Expr>>>()?;
2379        let source = project(source, exprs)?;
2380
2381        let insert_op = match (overwrite, replace_into) {
2382            (false, false) => InsertOp::Append,
2383            (true, false) => InsertOp::Overwrite,
2384            (false, true) => InsertOp::Replace,
2385            (true, true) => plan_err!(
2386                "Conflicting insert operations: `overwrite` and `replace_into` cannot both be true"
2387            )?,
2388        };
2389
2390        let plan = LogicalPlan::Dml(DmlStatement::new(
2391            table_name,
2392            Arc::clone(&table_source),
2393            WriteOp::Insert(insert_op),
2394            Arc::new(source),
2395        ));
2396        Ok(plan)
2397    }
2398
2399    fn show_columns_to_plan(
2400        &self,
2401        extended: bool,
2402        full: bool,
2403        sql_table_name: ObjectName,
2404    ) -> Result<LogicalPlan> {
2405        // Figure out the where clause
2406        let where_clause = object_name_to_qualifier(
2407            &sql_table_name,
2408            self.options.enable_ident_normalization,
2409        )?;
2410
2411        if !self.has_table("information_schema", "columns") {
2412            return plan_err!(
2413                "SHOW COLUMNS is not supported unless information_schema is enabled"
2414            );
2415        }
2416
2417        // Do a table lookup to verify the table exists
2418        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2419        let _ = self.context_provider.get_table_source(table_ref)?;
2420
2421        // Treat both FULL and EXTENDED as the same
2422        let select_list = if full || extended {
2423            "*"
2424        } else {
2425            "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2426        };
2427
2428        let query = format!(
2429            "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2430        );
2431
2432        let mut rewrite = DFParser::parse_sql(&query)?;
2433        assert_eq!(rewrite.len(), 1);
2434        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2435    }
2436
2437    /// Rewrite `SHOW FUNCTIONS` to another SQL query
2438    /// The query is based on the `information_schema.routines` and `information_schema.parameters` tables
2439    ///
2440    /// The output columns:
2441    /// - function_name: The name of function
2442    /// - return_type: The return type of the function
2443    /// - parameters: The name of parameters (ordered by the ordinal position)
2444    /// - parameter_types: The type of parameters (ordered by the ordinal position)
2445    /// - description: The description of the function (the description defined in the document)
2446    /// - syntax_example: The syntax_example of the function (the syntax_example defined in the document)
2447    fn show_functions_to_plan(
2448        &self,
2449        filter: Option<ShowStatementFilter>,
2450    ) -> Result<LogicalPlan> {
2451        let where_clause = if let Some(filter) = filter {
2452            match filter {
2453                ShowStatementFilter::Like(like) => {
2454                    format!("WHERE p.function_name like '{like}'")
2455                }
2456                _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2457            }
2458        } else {
2459            "".to_string()
2460        };
2461
2462        let query = format!(
2463            r#"
2464SELECT DISTINCT
2465    p.*,
2466    r.function_type function_type,
2467    r.description description,
2468    r.syntax_example syntax_example
2469FROM
2470    (
2471        SELECT
2472            i.specific_name function_name,
2473            o.data_type return_type,
2474            array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2475            array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2476        FROM (
2477                 SELECT
2478                     specific_catalog,
2479                     specific_schema,
2480                     specific_name,
2481                     ordinal_position,
2482                     parameter_name,
2483                     data_type,
2484                     rid
2485                 FROM
2486                     information_schema.parameters
2487                 WHERE
2488                     parameter_mode = 'IN'
2489             ) i
2490                 JOIN
2491             (
2492                 SELECT
2493                     specific_catalog,
2494                     specific_schema,
2495                     specific_name,
2496                     ordinal_position,
2497                     parameter_name,
2498                     data_type,
2499                     rid
2500                 FROM
2501                     information_schema.parameters
2502                 WHERE
2503                     parameter_mode = 'OUT'
2504             ) o
2505             ON i.specific_catalog = o.specific_catalog
2506                 AND i.specific_schema = o.specific_schema
2507                 AND i.specific_name = o.specific_name
2508                 AND i.rid = o.rid
2509        GROUP BY 1, 2, i.rid
2510    ) as p
2511JOIN information_schema.routines r
2512ON p.function_name = r.routine_name
2513{where_clause}
2514            "#
2515        );
2516        let mut rewrite = DFParser::parse_sql(&query)?;
2517        assert_eq!(rewrite.len(), 1);
2518        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2519    }
2520
2521    fn show_create_table_to_plan(
2522        &self,
2523        sql_table_name: ObjectName,
2524    ) -> Result<LogicalPlan> {
2525        if !self.has_table("information_schema", "tables") {
2526            return plan_err!(
2527                "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2528            );
2529        }
2530        // Figure out the where clause
2531        let where_clause = object_name_to_qualifier(
2532            &sql_table_name,
2533            self.options.enable_ident_normalization,
2534        )?;
2535
2536        // Do a table lookup to verify the table exists
2537        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2538        let _ = self.context_provider.get_table_source(table_ref)?;
2539
2540        let query = format!(
2541            "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2542        );
2543
2544        let mut rewrite = DFParser::parse_sql(&query)?;
2545        assert_eq!(rewrite.len(), 1);
2546        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2547    }
2548
2549    /// Return true if there is a table provider available for "schema.table"
2550    fn has_table(&self, schema: &str, table: &str) -> bool {
2551        let tables_reference = TableReference::Partial {
2552            schema: schema.into(),
2553            table: table.into(),
2554        };
2555        self.context_provider
2556            .get_table_source(tables_reference)
2557            .is_ok()
2558    }
2559
2560    fn validate_transaction_kind(
2561        &self,
2562        kind: Option<&BeginTransactionKind>,
2563    ) -> Result<()> {
2564        match kind {
2565            // BEGIN
2566            None => Ok(()),
2567            // BEGIN TRANSACTION
2568            Some(BeginTransactionKind::Transaction) => Ok(()),
2569            Some(BeginTransactionKind::Work) => {
2570                not_impl_err!("Transaction kind not supported: {kind:?}")
2571            }
2572        }
2573    }
2574}