datafusion_sql/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24    CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25    LexOrdering, Statement as DFStatement,
26};
27use crate::planner::{
28    object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{DataType, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36    exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
37    unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
38    DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
39    ToDFSchema,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::builder::project;
44use datafusion_expr::logical_plan::DdlStatement;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47    cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
48    CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49    CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50    DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51    EmptyRelation, Execute, Explain, Expr, ExprSchemable, Filter, LogicalPlan,
52    LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable, SortExpr,
53    Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
54    TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
55    Volatility, WriteOp,
56};
57use sqlparser::ast::{
58    self, BeginTransactionKind, NullsDistinctOption, ShowStatementIn,
59    ShowStatementOptions, SqliteOnConflict, TableObject, UpdateTableFromKind,
60};
61use sqlparser::ast::{
62    Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
63    CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
64    ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr,
65    ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor,
66    TableWithJoins, TransactionMode, UnaryOperator, Value,
67};
68use sqlparser::parser::ParserError::ParserError;
69
70fn ident_to_string(ident: &Ident) -> String {
71    normalize_ident(ident.to_owned())
72}
73
74fn object_name_to_string(object_name: &ObjectName) -> String {
75    object_name
76        .0
77        .iter()
78        .map(ident_to_string)
79        .collect::<Vec<String>>()
80        .join(".")
81}
82
83fn get_schema_name(schema_name: &SchemaName) -> String {
84    match schema_name {
85        SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
86        SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
87        SchemaName::NamedAuthorization(schema_name, auth) => format!(
88            "{}.{}",
89            object_name_to_string(schema_name),
90            ident_to_string(auth)
91        ),
92    }
93}
94
95/// Construct `TableConstraint`(s) for the given columns by iterating over
96/// `columns` and extracting individual inline constraint definitions.
97fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
98    let mut constraints = vec![];
99    for column in columns {
100        for ast::ColumnOptionDef { name, option } in &column.options {
101            match option {
102                ast::ColumnOption::Unique {
103                    is_primary: false,
104                    characteristics,
105                } => constraints.push(TableConstraint::Unique {
106                    name: name.clone(),
107                    columns: vec![column.name.clone()],
108                    characteristics: *characteristics,
109                    index_name: None,
110                    index_type_display: ast::KeyOrIndexDisplay::None,
111                    index_type: None,
112                    index_options: vec![],
113                    nulls_distinct: NullsDistinctOption::None,
114                }),
115                ast::ColumnOption::Unique {
116                    is_primary: true,
117                    characteristics,
118                } => constraints.push(TableConstraint::PrimaryKey {
119                    name: name.clone(),
120                    columns: vec![column.name.clone()],
121                    characteristics: *characteristics,
122                    index_name: None,
123                    index_type: None,
124                    index_options: vec![],
125                }),
126                ast::ColumnOption::ForeignKey {
127                    foreign_table,
128                    referred_columns,
129                    on_delete,
130                    on_update,
131                    characteristics,
132                } => constraints.push(TableConstraint::ForeignKey {
133                    name: name.clone(),
134                    columns: vec![],
135                    foreign_table: foreign_table.clone(),
136                    referred_columns: referred_columns.to_vec(),
137                    on_delete: *on_delete,
138                    on_update: *on_update,
139                    characteristics: *characteristics,
140                }),
141                ast::ColumnOption::Check(expr) => {
142                    constraints.push(TableConstraint::Check {
143                        name: name.clone(),
144                        expr: Box::new(expr.clone()),
145                    })
146                }
147                // Other options are not constraint related.
148                ast::ColumnOption::Default(_)
149                | ast::ColumnOption::Null
150                | ast::ColumnOption::NotNull
151                | ast::ColumnOption::DialectSpecific(_)
152                | ast::ColumnOption::CharacterSet(_)
153                | ast::ColumnOption::Generated { .. }
154                | ast::ColumnOption::Comment(_)
155                | ast::ColumnOption::Options(_)
156                | ast::ColumnOption::OnUpdate(_)
157                | ast::ColumnOption::Materialized(_)
158                | ast::ColumnOption::Ephemeral(_)
159                | ast::ColumnOption::Identity(_)
160                | ast::ColumnOption::OnConflict(_)
161                | ast::ColumnOption::Policy(_)
162                | ast::ColumnOption::Tags(_)
163                | ast::ColumnOption::Alias(_) => {}
164            }
165        }
166    }
167    constraints
168}
169
170impl<S: ContextProvider> SqlToRel<'_, S> {
171    /// Generate a logical plan from an DataFusion SQL statement
172    pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
173        match statement {
174            DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
175            DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
176            DFStatement::CopyTo(s) => self.copy_to_plan(s),
177            DFStatement::Explain(ExplainStatement {
178                verbose,
179                analyze,
180                statement,
181            }) => self.explain_to_plan(verbose, analyze, *statement),
182        }
183    }
184
185    /// Generate a logical plan from an SQL statement
186    pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
187        self.sql_statement_to_plan_with_context_impl(
188            statement,
189            &mut PlannerContext::new(),
190        )
191    }
192
193    /// Generate a logical plan from an SQL statement
194    pub fn sql_statement_to_plan_with_context(
195        &self,
196        statement: Statement,
197        planner_context: &mut PlannerContext,
198    ) -> Result<LogicalPlan> {
199        self.sql_statement_to_plan_with_context_impl(statement, planner_context)
200    }
201
202    fn sql_statement_to_plan_with_context_impl(
203        &self,
204        statement: Statement,
205        planner_context: &mut PlannerContext,
206    ) -> Result<LogicalPlan> {
207        match statement {
208            Statement::ExplainTable {
209                describe_alias: DescribeAlias::Describe, // only parse 'DESCRIBE table_name' and not 'EXPLAIN table_name'
210                table_name,
211                ..
212            } => self.describe_table_to_plan(table_name),
213            Statement::Explain {
214                verbose,
215                statement,
216                analyze,
217                format: _,
218                describe_alias: _,
219                ..
220            } => {
221                self.explain_to_plan(verbose, analyze, DFStatement::Statement(statement))
222            }
223            Statement::Query(query) => self.query_to_plan(*query, planner_context),
224            Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
225            Statement::SetVariable {
226                local,
227                hivevar,
228                variables,
229                value,
230            } => self.set_variable_to_plan(local, hivevar, &variables, value),
231
232            Statement::CreateTable(CreateTable {
233                temporary,
234                external,
235                global,
236                transient,
237                volatile,
238                hive_distribution,
239                hive_formats,
240                file_format,
241                location,
242                query,
243                name,
244                columns,
245                constraints,
246                table_properties,
247                with_options,
248                if_not_exists,
249                or_replace,
250                without_rowid,
251                like,
252                clone,
253                engine,
254                comment,
255                auto_increment_offset,
256                default_charset,
257                collation,
258                on_commit,
259                on_cluster,
260                primary_key,
261                order_by,
262                partition_by,
263                cluster_by,
264                clustered_by,
265                options,
266                strict,
267                copy_grants,
268                enable_schema_evolution,
269                change_tracking,
270                data_retention_time_in_days,
271                max_data_extension_time_in_days,
272                default_ddl_collation,
273                with_aggregation_policy,
274                with_row_access_policy,
275                with_tags,
276            }) if table_properties.is_empty() && with_options.is_empty() => {
277                if temporary {
278                    return not_impl_err!("Temporary tables not supported")?;
279                }
280                if external {
281                    return not_impl_err!("External tables not supported")?;
282                }
283                if global.is_some() {
284                    return not_impl_err!("Global tables not supported")?;
285                }
286                if transient {
287                    return not_impl_err!("Transient tables not supported")?;
288                }
289                if volatile {
290                    return not_impl_err!("Volatile tables not supported")?;
291                }
292                if hive_distribution != ast::HiveDistributionStyle::NONE {
293                    return not_impl_err!(
294                        "Hive distribution not supported: {hive_distribution:?}"
295                    )?;
296                }
297                if !matches!(
298                    hive_formats,
299                    Some(ast::HiveFormat {
300                        row_format: None,
301                        serde_properties: None,
302                        storage: None,
303                        location: None,
304                    })
305                ) {
306                    return not_impl_err!(
307                        "Hive formats not supported: {hive_formats:?}"
308                    )?;
309                }
310                if file_format.is_some() {
311                    return not_impl_err!("File format not supported")?;
312                }
313                if location.is_some() {
314                    return not_impl_err!("Location not supported")?;
315                }
316                if without_rowid {
317                    return not_impl_err!("Without rowid not supported")?;
318                }
319                if like.is_some() {
320                    return not_impl_err!("Like not supported")?;
321                }
322                if clone.is_some() {
323                    return not_impl_err!("Clone not supported")?;
324                }
325                if engine.is_some() {
326                    return not_impl_err!("Engine not supported")?;
327                }
328                if comment.is_some() {
329                    return not_impl_err!("Comment not supported")?;
330                }
331                if auto_increment_offset.is_some() {
332                    return not_impl_err!("Auto increment offset not supported")?;
333                }
334                if default_charset.is_some() {
335                    return not_impl_err!("Default charset not supported")?;
336                }
337                if collation.is_some() {
338                    return not_impl_err!("Collation not supported")?;
339                }
340                if on_commit.is_some() {
341                    return not_impl_err!("On commit not supported")?;
342                }
343                if on_cluster.is_some() {
344                    return not_impl_err!("On cluster not supported")?;
345                }
346                if primary_key.is_some() {
347                    return not_impl_err!("Primary key not supported")?;
348                }
349                if order_by.is_some() {
350                    return not_impl_err!("Order by not supported")?;
351                }
352                if partition_by.is_some() {
353                    return not_impl_err!("Partition by not supported")?;
354                }
355                if cluster_by.is_some() {
356                    return not_impl_err!("Cluster by not supported")?;
357                }
358                if clustered_by.is_some() {
359                    return not_impl_err!("Clustered by not supported")?;
360                }
361                if options.is_some() {
362                    return not_impl_err!("Options not supported")?;
363                }
364                if strict {
365                    return not_impl_err!("Strict not supported")?;
366                }
367                if copy_grants {
368                    return not_impl_err!("Copy grants not supported")?;
369                }
370                if enable_schema_evolution.is_some() {
371                    return not_impl_err!("Enable schema evolution not supported")?;
372                }
373                if change_tracking.is_some() {
374                    return not_impl_err!("Change tracking not supported")?;
375                }
376                if data_retention_time_in_days.is_some() {
377                    return not_impl_err!("Data retention time in days not supported")?;
378                }
379                if max_data_extension_time_in_days.is_some() {
380                    return not_impl_err!(
381                        "Max data extension time in days not supported"
382                    )?;
383                }
384                if default_ddl_collation.is_some() {
385                    return not_impl_err!("Default DDL collation not supported")?;
386                }
387                if with_aggregation_policy.is_some() {
388                    return not_impl_err!("With aggregation policy not supported")?;
389                }
390                if with_row_access_policy.is_some() {
391                    return not_impl_err!("With row access policy not supported")?;
392                }
393                if with_tags.is_some() {
394                    return not_impl_err!("With tags not supported")?;
395                }
396
397                // Merge inline constraints and existing constraints
398                let mut all_constraints = constraints;
399                let inline_constraints = calc_inline_constraints_from_columns(&columns);
400                all_constraints.extend(inline_constraints);
401                // Build column default values
402                let column_defaults =
403                    self.build_column_defaults(&columns, planner_context)?;
404
405                let has_columns = !columns.is_empty();
406                let schema = self.build_schema(columns)?.to_dfschema_ref()?;
407                if has_columns {
408                    planner_context.set_table_schema(Some(Arc::clone(&schema)));
409                }
410
411                match query {
412                    Some(query) => {
413                        let plan = self.query_to_plan(*query, planner_context)?;
414                        let input_schema = plan.schema();
415
416                        let plan = if has_columns {
417                            if schema.fields().len() != input_schema.fields().len() {
418                                return plan_err!(
419                            "Mismatch: {} columns specified, but result has {} columns",
420                            schema.fields().len(),
421                            input_schema.fields().len()
422                        );
423                            }
424                            let input_fields = input_schema.fields();
425                            let project_exprs = schema
426                                .fields()
427                                .iter()
428                                .zip(input_fields)
429                                .map(|(field, input_field)| {
430                                    cast(
431                                        col(input_field.name()),
432                                        field.data_type().clone(),
433                                    )
434                                    .alias(field.name())
435                                })
436                                .collect::<Vec<_>>();
437                            LogicalPlanBuilder::from(plan.clone())
438                                .project(project_exprs)?
439                                .build()?
440                        } else {
441                            plan
442                        };
443
444                        let constraints = self.new_constraint_from_table_constraints(
445                            &all_constraints,
446                            plan.schema(),
447                        )?;
448
449                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
450                            CreateMemoryTable {
451                                name: self.object_name_to_table_reference(name)?,
452                                constraints,
453                                input: Arc::new(plan),
454                                if_not_exists,
455                                or_replace,
456                                column_defaults,
457                                temporary,
458                            },
459                        )))
460                    }
461
462                    None => {
463                        let plan = EmptyRelation {
464                            produce_one_row: false,
465                            schema,
466                        };
467                        let plan = LogicalPlan::EmptyRelation(plan);
468                        let constraints = self.new_constraint_from_table_constraints(
469                            &all_constraints,
470                            plan.schema(),
471                        )?;
472                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
473                            CreateMemoryTable {
474                                name: self.object_name_to_table_reference(name)?,
475                                constraints,
476                                input: Arc::new(plan),
477                                if_not_exists,
478                                or_replace,
479                                column_defaults,
480                                temporary,
481                            },
482                        )))
483                    }
484                }
485            }
486
487            Statement::CreateView {
488                or_replace,
489                materialized,
490                name,
491                columns,
492                query,
493                options: CreateTableOptions::None,
494                cluster_by,
495                comment,
496                with_no_schema_binding,
497                if_not_exists,
498                temporary,
499                to,
500                params,
501            } => {
502                if materialized {
503                    return not_impl_err!("Materialized views not supported")?;
504                }
505                if !cluster_by.is_empty() {
506                    return not_impl_err!("Cluster by not supported")?;
507                }
508                if comment.is_some() {
509                    return not_impl_err!("Comment not supported")?;
510                }
511                if with_no_schema_binding {
512                    return not_impl_err!("With no schema binding not supported")?;
513                }
514                if if_not_exists {
515                    return not_impl_err!("If not exists not supported")?;
516                }
517                if to.is_some() {
518                    return not_impl_err!("To not supported")?;
519                }
520
521                // put the statement back together temporarily to get the SQL
522                // string representation
523                let stmt = Statement::CreateView {
524                    or_replace,
525                    materialized,
526                    name,
527                    columns,
528                    query,
529                    options: CreateTableOptions::None,
530                    cluster_by,
531                    comment,
532                    with_no_schema_binding,
533                    if_not_exists,
534                    temporary,
535                    to,
536                    params,
537                };
538                let sql = stmt.to_string();
539                let Statement::CreateView {
540                    name,
541                    columns,
542                    query,
543                    or_replace,
544                    temporary,
545                    ..
546                } = stmt
547                else {
548                    return internal_err!("Unreachable code in create view");
549                };
550
551                let columns = columns
552                    .into_iter()
553                    .map(|view_column_def| {
554                        if let Some(options) = view_column_def.options {
555                            plan_err!(
556                                "Options not supported for view columns: {options:?}"
557                            )
558                        } else {
559                            Ok(view_column_def.name)
560                        }
561                    })
562                    .collect::<Result<Vec<_>>>()?;
563
564                let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
565                plan = self.apply_expr_alias(plan, columns)?;
566
567                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
568                    name: self.object_name_to_table_reference(name)?,
569                    input: Arc::new(plan),
570                    or_replace,
571                    definition: Some(sql),
572                    temporary,
573                })))
574            }
575            Statement::ShowCreate { obj_type, obj_name } => match obj_type {
576                ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
577                _ => {
578                    not_impl_err!("Only `SHOW CREATE TABLE  ...` statement is supported")
579                }
580            },
581            Statement::CreateSchema {
582                schema_name,
583                if_not_exists,
584            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
585                CreateCatalogSchema {
586                    schema_name: get_schema_name(&schema_name),
587                    if_not_exists,
588                    schema: Arc::new(DFSchema::empty()),
589                },
590            ))),
591            Statement::CreateDatabase {
592                db_name,
593                if_not_exists,
594                ..
595            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
596                CreateCatalog {
597                    catalog_name: object_name_to_string(&db_name),
598                    if_not_exists,
599                    schema: Arc::new(DFSchema::empty()),
600                },
601            ))),
602            Statement::Drop {
603                object_type,
604                if_exists,
605                mut names,
606                cascade,
607                restrict: _,
608                purge: _,
609                temporary: _,
610            } => {
611                // We don't support cascade and purge for now.
612                // nor do we support multiple object names
613                let name = match names.len() {
614                    0 => Err(ParserError("Missing table name.".to_string()).into()),
615                    1 => self.object_name_to_table_reference(names.pop().unwrap()),
616                    _ => {
617                        Err(ParserError("Multiple objects not supported".to_string())
618                            .into())
619                    }
620                }?;
621
622                match object_type {
623                    ObjectType::Table => {
624                        Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
625                            name,
626                            if_exists,
627                            schema: DFSchemaRef::new(DFSchema::empty()),
628                        })))
629                    }
630                    ObjectType::View => {
631                        Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
632                            name,
633                            if_exists,
634                            schema: DFSchemaRef::new(DFSchema::empty()),
635                        })))
636                    }
637                    ObjectType::Schema => {
638                        let name = match name {
639                            TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
640                            TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
641                            TableReference::Full { catalog: _, schema: _, table: _ } => {
642                                Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
643                            }
644                        }?;
645                        Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
646                            name,
647                            if_exists,
648                            cascade,
649                            schema: DFSchemaRef::new(DFSchema::empty()),
650                        })))
651                    }
652                    _ => not_impl_err!(
653                        "Only `DROP TABLE/VIEW/SCHEMA  ...` statement is supported currently"
654                    ),
655                }
656            }
657            Statement::Prepare {
658                name,
659                data_types,
660                statement,
661            } => {
662                // Convert parser data types to DataFusion data types
663                let data_types: Vec<DataType> = data_types
664                    .into_iter()
665                    .map(|t| self.convert_data_type(&t))
666                    .collect::<Result<_>>()?;
667
668                // Create planner context with parameters
669                let mut planner_context = PlannerContext::new()
670                    .with_prepare_param_data_types(data_types.clone());
671
672                // Build logical plan for inner statement of the prepare statement
673                let plan = self.sql_statement_to_plan_with_context_impl(
674                    *statement,
675                    &mut planner_context,
676                )?;
677                Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
678                    name: ident_to_string(&name),
679                    data_types,
680                    input: Arc::new(plan),
681                })))
682            }
683            Statement::Execute {
684                name,
685                parameters,
686                using,
687                // has_parentheses specifies the syntax, but the plan is the
688                // same no matter the syntax used, so ignore it
689                has_parentheses: _,
690            } => {
691                // `USING` is a MySQL-specific syntax and currently not supported.
692                if !using.is_empty() {
693                    return not_impl_err!(
694                        "Execute statement with USING is not supported"
695                    );
696                }
697
698                let empty_schema = DFSchema::empty();
699                let parameters = parameters
700                    .into_iter()
701                    .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
702                    .collect::<Result<Vec<Expr>>>()?;
703
704                Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
705                    name: object_name_to_string(&name),
706                    parameters,
707                })))
708            }
709            Statement::Deallocate {
710                name,
711                // Similar to PostgreSQL, the PREPARE keyword is ignored
712                prepare: _,
713            } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
714                Deallocate {
715                    name: ident_to_string(&name),
716                },
717            ))),
718
719            Statement::ShowTables {
720                extended,
721                full,
722                terse,
723                history,
724                external,
725                show_options,
726            } => {
727                // We only support the basic "SHOW TABLES"
728                // https://github.com/apache/datafusion/issues/3188
729                if extended {
730                    return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
731                }
732                if full {
733                    return not_impl_err!("SHOW FULL TABLES not supported")?;
734                }
735                if terse {
736                    return not_impl_err!("SHOW TERSE TABLES not supported")?;
737                }
738                if history {
739                    return not_impl_err!("SHOW TABLES HISTORY not supported")?;
740                }
741                if external {
742                    return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
743                }
744                let ShowStatementOptions {
745                    show_in,
746                    starts_with,
747                    limit,
748                    limit_from,
749                    filter_position,
750                } = show_options;
751                if show_in.is_some() {
752                    return not_impl_err!("SHOW TABLES IN not supported")?;
753                }
754                if starts_with.is_some() {
755                    return not_impl_err!("SHOW TABLES LIKE not supported")?;
756                }
757                if limit.is_some() {
758                    return not_impl_err!("SHOW TABLES LIMIT not supported")?;
759                }
760                if limit_from.is_some() {
761                    return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
762                }
763                if filter_position.is_some() {
764                    return not_impl_err!("SHOW TABLES FILTER not supported")?;
765                }
766                self.show_tables_to_plan()
767            }
768
769            Statement::ShowColumns {
770                extended,
771                full,
772                show_options,
773            } => {
774                let ShowStatementOptions {
775                    show_in,
776                    starts_with,
777                    limit,
778                    limit_from,
779                    filter_position,
780                } = show_options;
781                if starts_with.is_some() {
782                    return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
783                }
784                if limit.is_some() {
785                    return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
786                }
787                if limit_from.is_some() {
788                    return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
789                }
790                if filter_position.is_some() {
791                    return not_impl_err!(
792                        "SHOW COLUMNS with WHERE or LIKE is not supported"
793                    )?;
794                }
795                let Some(ShowStatementIn {
796                    // specifies if the syntax was `SHOW COLUMNS IN` or `SHOW
797                    // COLUMNS FROM` which is not different in DataFusion
798                    clause: _,
799                    parent_type,
800                    parent_name,
801                }) = show_in
802                else {
803                    return plan_err!("SHOW COLUMNS requires a table name");
804                };
805
806                if let Some(parent_type) = parent_type {
807                    return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
808                }
809                let Some(table_name) = parent_name else {
810                    return plan_err!("SHOW COLUMNS requires a table name");
811                };
812
813                self.show_columns_to_plan(extended, full, table_name)
814            }
815
816            Statement::ShowFunctions { filter, .. } => {
817                self.show_functions_to_plan(filter)
818            }
819
820            Statement::Insert(Insert {
821                or,
822                into,
823                columns,
824                overwrite,
825                source,
826                partitioned,
827                after_columns,
828                table,
829                on,
830                returning,
831                ignore,
832                table_alias,
833                mut replace_into,
834                priority,
835                insert_alias,
836                assignments,
837                has_table_keyword,
838                settings,
839                format_clause,
840            }) => {
841                let table_name = match table {
842                    TableObject::TableName(table_name) => table_name,
843                    TableObject::TableFunction(_) => {
844                        return not_impl_err!("INSERT INTO Table functions not supported")
845                    }
846                };
847                if let Some(or) = or {
848                    match or {
849                        SqliteOnConflict::Replace => replace_into = true,
850                        _ => plan_err!("Inserts with {or} clause is not supported")?,
851                    }
852                }
853                if partitioned.is_some() {
854                    plan_err!("Partitioned inserts not yet supported")?;
855                }
856                if !after_columns.is_empty() {
857                    plan_err!("After-columns clause not supported")?;
858                }
859                if on.is_some() {
860                    plan_err!("Insert-on clause not supported")?;
861                }
862                if returning.is_some() {
863                    plan_err!("Insert-returning clause not supported")?;
864                }
865                if ignore {
866                    plan_err!("Insert-ignore clause not supported")?;
867                }
868                let Some(source) = source else {
869                    plan_err!("Inserts without a source not supported")?
870                };
871                if let Some(table_alias) = table_alias {
872                    plan_err!(
873                        "Inserts with a table alias not supported: {table_alias:?}"
874                    )?
875                };
876                if let Some(priority) = priority {
877                    plan_err!(
878                        "Inserts with a `PRIORITY` clause not supported: {priority:?}"
879                    )?
880                };
881                if insert_alias.is_some() {
882                    plan_err!("Inserts with an alias not supported")?;
883                }
884                if !assignments.is_empty() {
885                    plan_err!("Inserts with assignments not supported")?;
886                }
887                if settings.is_some() {
888                    plan_err!("Inserts with settings not supported")?;
889                }
890                if format_clause.is_some() {
891                    plan_err!("Inserts with format clause not supported")?;
892                }
893                // optional keywords don't change behavior
894                let _ = into;
895                let _ = has_table_keyword;
896                self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
897            }
898            Statement::Update {
899                table,
900                assignments,
901                from,
902                selection,
903                returning,
904                or,
905            } => {
906                let from =
907                    from.map(|update_table_from_kind| match update_table_from_kind {
908                        UpdateTableFromKind::BeforeSet(from) => from,
909                        UpdateTableFromKind::AfterSet(from) => from,
910                    });
911                if returning.is_some() {
912                    plan_err!("Update-returning clause not yet supported")?;
913                }
914                if or.is_some() {
915                    plan_err!("ON conflict not supported")?;
916                }
917                self.update_to_plan(table, assignments, from, selection)
918            }
919
920            Statement::Delete(Delete {
921                tables,
922                using,
923                selection,
924                returning,
925                from,
926                order_by,
927                limit,
928            }) => {
929                if !tables.is_empty() {
930                    plan_err!("DELETE <TABLE> not supported")?;
931                }
932
933                if using.is_some() {
934                    plan_err!("Using clause not supported")?;
935                }
936
937                if returning.is_some() {
938                    plan_err!("Delete-returning clause not yet supported")?;
939                }
940
941                if !order_by.is_empty() {
942                    plan_err!("Delete-order-by clause not yet supported")?;
943                }
944
945                if limit.is_some() {
946                    plan_err!("Delete-limit clause not yet supported")?;
947                }
948
949                let table_name = self.get_delete_target(from)?;
950                self.delete_to_plan(table_name, selection)
951            }
952
953            Statement::StartTransaction {
954                modes,
955                begin: false,
956                modifier,
957                transaction,
958            } => {
959                if let Some(modifier) = modifier {
960                    return not_impl_err!(
961                        "Transaction modifier not supported: {modifier}"
962                    );
963                }
964                self.validate_transaction_kind(transaction)?;
965                let isolation_level: ast::TransactionIsolationLevel = modes
966                    .iter()
967                    .filter_map(|m: &TransactionMode| match m {
968                        TransactionMode::AccessMode(_) => None,
969                        TransactionMode::IsolationLevel(level) => Some(level),
970                    })
971                    .last()
972                    .copied()
973                    .unwrap_or(ast::TransactionIsolationLevel::Serializable);
974                let access_mode: ast::TransactionAccessMode = modes
975                    .iter()
976                    .filter_map(|m: &TransactionMode| match m {
977                        TransactionMode::AccessMode(mode) => Some(mode),
978                        TransactionMode::IsolationLevel(_) => None,
979                    })
980                    .last()
981                    .copied()
982                    .unwrap_or(ast::TransactionAccessMode::ReadWrite);
983                let isolation_level = match isolation_level {
984                    ast::TransactionIsolationLevel::ReadUncommitted => {
985                        TransactionIsolationLevel::ReadUncommitted
986                    }
987                    ast::TransactionIsolationLevel::ReadCommitted => {
988                        TransactionIsolationLevel::ReadCommitted
989                    }
990                    ast::TransactionIsolationLevel::RepeatableRead => {
991                        TransactionIsolationLevel::RepeatableRead
992                    }
993                    ast::TransactionIsolationLevel::Serializable => {
994                        TransactionIsolationLevel::Serializable
995                    }
996                    ast::TransactionIsolationLevel::Snapshot => {
997                        TransactionIsolationLevel::Snapshot
998                    }
999                };
1000                let access_mode = match access_mode {
1001                    ast::TransactionAccessMode::ReadOnly => {
1002                        TransactionAccessMode::ReadOnly
1003                    }
1004                    ast::TransactionAccessMode::ReadWrite => {
1005                        TransactionAccessMode::ReadWrite
1006                    }
1007                };
1008                let statement = PlanStatement::TransactionStart(TransactionStart {
1009                    access_mode,
1010                    isolation_level,
1011                });
1012                Ok(LogicalPlan::Statement(statement))
1013            }
1014            Statement::Commit {
1015                chain,
1016                end,
1017                modifier,
1018            } => {
1019                if end {
1020                    return not_impl_err!("COMMIT AND END not supported");
1021                };
1022                if let Some(modifier) = modifier {
1023                    return not_impl_err!("COMMIT {modifier} not supported");
1024                };
1025                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1026                    conclusion: TransactionConclusion::Commit,
1027                    chain,
1028                });
1029                Ok(LogicalPlan::Statement(statement))
1030            }
1031            Statement::Rollback { chain, savepoint } => {
1032                if savepoint.is_some() {
1033                    plan_err!("Savepoints not supported")?;
1034                }
1035                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1036                    conclusion: TransactionConclusion::Rollback,
1037                    chain,
1038                });
1039                Ok(LogicalPlan::Statement(statement))
1040            }
1041            Statement::CreateFunction(ast::CreateFunction {
1042                or_replace,
1043                temporary,
1044                name,
1045                args,
1046                return_type,
1047                function_body,
1048                behavior,
1049                language,
1050                ..
1051            }) => {
1052                let return_type = match return_type {
1053                    Some(t) => Some(self.convert_data_type(&t)?),
1054                    None => None,
1055                };
1056                let mut planner_context = PlannerContext::new();
1057                let empty_schema = &DFSchema::empty();
1058
1059                let args = match args {
1060                    Some(function_args) => {
1061                        let function_args = function_args
1062                            .into_iter()
1063                            .map(|arg| {
1064                                let data_type = self.convert_data_type(&arg.data_type)?;
1065
1066                                let default_expr = match arg.default_expr {
1067                                    Some(expr) => Some(self.sql_to_expr(
1068                                        expr,
1069                                        empty_schema,
1070                                        &mut planner_context,
1071                                    )?),
1072                                    None => None,
1073                                };
1074                                Ok(OperateFunctionArg {
1075                                    name: arg.name,
1076                                    default_expr,
1077                                    data_type,
1078                                })
1079                            })
1080                            .collect::<Result<Vec<OperateFunctionArg>>>();
1081                        Some(function_args?)
1082                    }
1083                    None => None,
1084                };
1085                // At the moment functions can't be qualified `schema.name`
1086                let name = match &name.0[..] {
1087                    [] => exec_err!("Function should have name")?,
1088                    [n] => n.value.clone(),
1089                    [..] => not_impl_err!("Qualified functions are not supported")?,
1090                };
1091                //
1092                // Convert resulting expression to data fusion expression
1093                //
1094                let arg_types = args.as_ref().map(|arg| {
1095                    arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
1096                });
1097                let mut planner_context = PlannerContext::new()
1098                    .with_prepare_param_data_types(arg_types.unwrap_or_default());
1099
1100                let function_body = match function_body {
1101                    Some(r) => Some(self.sql_to_expr(
1102                        match r {
1103                            ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1104                            ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1105                            ast::CreateFunctionBody::Return(expr) => expr,
1106                        },
1107                        &DFSchema::empty(),
1108                        &mut planner_context,
1109                    )?),
1110                    None => None,
1111                };
1112
1113                let params = CreateFunctionBody {
1114                    language,
1115                    behavior: behavior.map(|b| match b {
1116                        ast::FunctionBehavior::Immutable => Volatility::Immutable,
1117                        ast::FunctionBehavior::Stable => Volatility::Stable,
1118                        ast::FunctionBehavior::Volatile => Volatility::Volatile,
1119                    }),
1120                    function_body,
1121                };
1122
1123                let statement = DdlStatement::CreateFunction(CreateFunction {
1124                    or_replace,
1125                    temporary,
1126                    name,
1127                    return_type,
1128                    args,
1129                    params,
1130                    schema: DFSchemaRef::new(DFSchema::empty()),
1131                });
1132
1133                Ok(LogicalPlan::Ddl(statement))
1134            }
1135            Statement::DropFunction {
1136                if_exists,
1137                func_desc,
1138                ..
1139            } => {
1140                // According to postgresql documentation it can be only one function
1141                // specified in drop statement
1142                if let Some(desc) = func_desc.first() {
1143                    // At the moment functions can't be qualified `schema.name`
1144                    let name = match &desc.name.0[..] {
1145                        [] => exec_err!("Function should have name")?,
1146                        [n] => n.value.clone(),
1147                        [..] => not_impl_err!("Qualified functions are not supported")?,
1148                    };
1149                    let statement = DdlStatement::DropFunction(DropFunction {
1150                        if_exists,
1151                        name,
1152                        schema: DFSchemaRef::new(DFSchema::empty()),
1153                    });
1154                    Ok(LogicalPlan::Ddl(statement))
1155                } else {
1156                    exec_err!("Function name not provided")
1157                }
1158            }
1159            Statement::CreateIndex(CreateIndex {
1160                name,
1161                table_name,
1162                using,
1163                columns,
1164                unique,
1165                if_not_exists,
1166                ..
1167            }) => {
1168                let name: Option<String> = name.as_ref().map(object_name_to_string);
1169                let table = self.object_name_to_table_reference(table_name)?;
1170                let table_schema = self
1171                    .context_provider
1172                    .get_table_source(table.clone())?
1173                    .schema()
1174                    .to_dfschema_ref()?;
1175                let using: Option<String> = using.as_ref().map(ident_to_string);
1176                let columns = self.order_by_to_sort_expr(
1177                    columns,
1178                    &table_schema,
1179                    planner_context,
1180                    false,
1181                    None,
1182                )?;
1183                Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1184                    PlanCreateIndex {
1185                        name,
1186                        table,
1187                        using,
1188                        columns,
1189                        unique,
1190                        if_not_exists,
1191                        schema: DFSchemaRef::new(DFSchema::empty()),
1192                    },
1193                )))
1194            }
1195            stmt => {
1196                not_impl_err!("Unsupported SQL statement: {stmt}")
1197            }
1198        }
1199    }
1200
1201    fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1202        let mut from = match from {
1203            FromTable::WithFromKeyword(v) => v,
1204            FromTable::WithoutKeyword(v) => v,
1205        };
1206
1207        if from.len() != 1 {
1208            return not_impl_err!(
1209                "DELETE FROM only supports single table, got {}: {from:?}",
1210                from.len()
1211            );
1212        }
1213        let table_factor = from.pop().unwrap();
1214        if !table_factor.joins.is_empty() {
1215            return not_impl_err!("DELETE FROM only supports single table, got: joins");
1216        }
1217        let TableFactor::Table { name, .. } = table_factor.relation else {
1218            return not_impl_err!(
1219                "DELETE FROM only supports single table, got: {table_factor:?}"
1220            );
1221        };
1222
1223        Ok(name)
1224    }
1225
1226    /// Generate a logical plan from a "SHOW TABLES" query
1227    fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1228        if self.has_table("information_schema", "tables") {
1229            let query = "SELECT * FROM information_schema.tables;";
1230            let mut rewrite = DFParser::parse_sql(query)?;
1231            assert_eq!(rewrite.len(), 1);
1232            self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
1233        } else {
1234            plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1235        }
1236    }
1237
1238    fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1239        let table_ref = self.object_name_to_table_reference(table_name)?;
1240
1241        let table_source = self.context_provider.get_table_source(table_ref)?;
1242
1243        let schema = table_source.schema();
1244
1245        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1246
1247        Ok(LogicalPlan::DescribeTable(DescribeTable {
1248            schema,
1249            output_schema: Arc::new(output_schema),
1250        }))
1251    }
1252
1253    fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1254        // Determine if source is table or query and handle accordingly
1255        let copy_source = statement.source;
1256        let (input, input_schema, table_ref) = match copy_source {
1257            CopyToSource::Relation(object_name) => {
1258                let table_name = object_name_to_string(&object_name);
1259                let table_ref = self.object_name_to_table_reference(object_name)?;
1260                let table_source =
1261                    self.context_provider.get_table_source(table_ref.clone())?;
1262                let plan =
1263                    LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1264                let input_schema = Arc::clone(plan.schema());
1265                (plan, input_schema, Some(table_ref))
1266            }
1267            CopyToSource::Query(query) => {
1268                let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1269                let input_schema = Arc::clone(plan.schema());
1270                (plan, input_schema, None)
1271            }
1272        };
1273
1274        let options_map = self.parse_options_map(statement.options, true)?;
1275
1276        let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1277            if let Ok(ext_file_type) = self.context_provider.get_file_type(stored_as) {
1278                Some(ext_file_type)
1279            } else {
1280                None
1281            }
1282        } else {
1283            None
1284        };
1285
1286        let file_type = match maybe_file_type {
1287            Some(ft) => ft,
1288            None => {
1289                let e = || {
1290                    DataFusionError::Configuration(
1291                        "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1292                            .to_string(),
1293                    )
1294                };
1295                // Try to infer file format from file extension
1296                let extension: &str = &Path::new(&statement.target)
1297                    .extension()
1298                    .ok_or_else(e)?
1299                    .to_str()
1300                    .ok_or_else(e)?
1301                    .to_lowercase();
1302
1303                self.context_provider.get_file_type(extension)?
1304            }
1305        };
1306
1307        let partition_by = statement
1308            .partitioned_by
1309            .iter()
1310            .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1311            .collect::<Result<Vec<_>>>()?
1312            .into_iter()
1313            .map(|f| f.name().to_owned())
1314            .collect();
1315
1316        Ok(LogicalPlan::Copy(CopyTo {
1317            input: Arc::new(input),
1318            output_url: statement.target,
1319            file_type,
1320            partition_by,
1321            options: options_map,
1322        }))
1323    }
1324
1325    fn build_order_by(
1326        &self,
1327        order_exprs: Vec<LexOrdering>,
1328        schema: &DFSchemaRef,
1329        planner_context: &mut PlannerContext,
1330    ) -> Result<Vec<Vec<SortExpr>>> {
1331        if !order_exprs.is_empty() && schema.fields().is_empty() {
1332            let results = order_exprs
1333                .iter()
1334                .map(|lex_order| {
1335                    let result = lex_order
1336                        .iter()
1337                        .map(|order_by_expr| {
1338                            let ordered_expr = &order_by_expr.expr;
1339                            let ordered_expr = ordered_expr.to_owned();
1340                            let ordered_expr = self
1341                                .sql_expr_to_logical_expr(
1342                                    ordered_expr,
1343                                    schema,
1344                                    planner_context,
1345                                )
1346                                .unwrap();
1347                            let asc = order_by_expr.asc.unwrap_or(true);
1348                            let nulls_first = order_by_expr.nulls_first.unwrap_or(!asc);
1349
1350                            SortExpr::new(ordered_expr, asc, nulls_first)
1351                        })
1352                        .collect::<Vec<SortExpr>>();
1353                    result
1354                })
1355                .collect::<Vec<Vec<SortExpr>>>();
1356
1357            return Ok(results);
1358        }
1359
1360        let mut all_results = vec![];
1361        for expr in order_exprs {
1362            // Convert each OrderByExpr to a SortExpr:
1363            let expr_vec =
1364                self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1365            // Verify that columns of all SortExprs exist in the schema:
1366            for sort in expr_vec.iter() {
1367                for column in sort.expr.column_refs().iter() {
1368                    if !schema.has_column(column) {
1369                        // Return an error if any column is not in the schema:
1370                        return plan_err!("Column {column} is not in schema");
1371                    }
1372                }
1373            }
1374            // If all SortExprs are valid, return them as an expression vector
1375            all_results.push(expr_vec)
1376        }
1377        Ok(all_results)
1378    }
1379
1380    /// Generate a logical plan from a CREATE EXTERNAL TABLE statement
1381    fn external_table_to_plan(
1382        &self,
1383        statement: CreateExternalTable,
1384    ) -> Result<LogicalPlan> {
1385        let definition = Some(statement.to_string());
1386        let CreateExternalTable {
1387            name,
1388            columns,
1389            file_type,
1390            location,
1391            table_partition_cols,
1392            if_not_exists,
1393            temporary,
1394            order_exprs,
1395            unbounded,
1396            options,
1397            constraints,
1398        } = statement;
1399
1400        // Merge inline constraints and existing constraints
1401        let mut all_constraints = constraints;
1402        let inline_constraints = calc_inline_constraints_from_columns(&columns);
1403        all_constraints.extend(inline_constraints);
1404
1405        let options_map = self.parse_options_map(options, false)?;
1406
1407        let compression = options_map
1408            .get("format.compression")
1409            .map(|c| CompressionTypeVariant::from_str(c))
1410            .transpose()?;
1411        if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1412            && compression
1413                .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1414                .unwrap_or(false)
1415        {
1416            plan_err!(
1417                "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1418            )?;
1419        }
1420
1421        let mut planner_context = PlannerContext::new();
1422
1423        let column_defaults = self
1424            .build_column_defaults(&columns, &mut planner_context)?
1425            .into_iter()
1426            .collect();
1427
1428        let schema = self.build_schema(columns)?;
1429        let df_schema = schema.to_dfschema_ref()?;
1430        df_schema.check_names()?;
1431
1432        let ordered_exprs =
1433            self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1434
1435        let name = self.object_name_to_table_reference(name)?;
1436        let constraints =
1437            self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1438        Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1439            PlanCreateExternalTable {
1440                schema: df_schema,
1441                name,
1442                location,
1443                file_type,
1444                table_partition_cols,
1445                if_not_exists,
1446                temporary,
1447                definition,
1448                order_exprs: ordered_exprs,
1449                unbounded,
1450                options: options_map,
1451                constraints,
1452                column_defaults,
1453            },
1454        )))
1455    }
1456
1457    /// Get the indices of the constraint columns in the schema.
1458    /// If any column is not found, return an error.
1459    fn get_constraint_column_indices(
1460        &self,
1461        df_schema: &DFSchemaRef,
1462        columns: &[Ident],
1463        constraint_name: &str,
1464    ) -> Result<Vec<usize>> {
1465        let field_names = df_schema.field_names();
1466        columns
1467            .iter()
1468            .map(|ident| {
1469                let column = self.ident_normalizer.normalize(ident.clone());
1470                field_names
1471                    .iter()
1472                    .position(|item| *item == column)
1473                    .ok_or_else(|| {
1474                        plan_datafusion_err!(
1475                            "Column for {constraint_name} not found in schema: {column}"
1476                        )
1477                    })
1478            })
1479            .collect::<Result<Vec<_>>>()
1480    }
1481
1482    /// Convert each [TableConstraint] to corresponding [Constraint]
1483    fn new_constraint_from_table_constraints(
1484        &self,
1485        constraints: &[TableConstraint],
1486        df_schema: &DFSchemaRef,
1487    ) -> Result<Constraints> {
1488        let constraints = constraints
1489            .iter()
1490            .map(|c: &TableConstraint| match c {
1491                TableConstraint::Unique { name, columns, .. } => {
1492                    let constraint_name = match name {
1493                        Some(name) => &format!("unique constraint with name '{name}'"),
1494                        None => "unique constraint",
1495                    };
1496                    // Get unique constraint indices in the schema
1497                    let indices = self.get_constraint_column_indices(
1498                        df_schema,
1499                        columns,
1500                        constraint_name,
1501                    )?;
1502                    Ok(Constraint::Unique(indices))
1503                }
1504                TableConstraint::PrimaryKey { columns, .. } => {
1505                    // Get primary key indices in the schema
1506                    let indices = self.get_constraint_column_indices(
1507                        df_schema,
1508                        columns,
1509                        "primary key",
1510                    )?;
1511                    Ok(Constraint::PrimaryKey(indices))
1512                }
1513                TableConstraint::ForeignKey { .. } => {
1514                    _plan_err!("Foreign key constraints are not currently supported")
1515                }
1516                TableConstraint::Check { .. } => {
1517                    _plan_err!("Check constraints are not currently supported")
1518                }
1519                TableConstraint::Index { .. } => {
1520                    _plan_err!("Indexes are not currently supported")
1521                }
1522                TableConstraint::FulltextOrSpatial { .. } => {
1523                    _plan_err!("Indexes are not currently supported")
1524                }
1525            })
1526            .collect::<Result<Vec<_>>>()?;
1527        Ok(Constraints::new_unverified(constraints))
1528    }
1529
1530    fn parse_options_map(
1531        &self,
1532        options: Vec<(String, Value)>,
1533        allow_duplicates: bool,
1534    ) -> Result<HashMap<String, String>> {
1535        let mut options_map = HashMap::new();
1536        for (key, value) in options {
1537            if !allow_duplicates && options_map.contains_key(&key) {
1538                return plan_err!("Option {key} is specified multiple times");
1539            }
1540
1541            let Some(value_string) = crate::utils::value_to_string(&value) else {
1542                return plan_err!("Unsupported Value {}", value);
1543            };
1544
1545            if !(&key.contains('.')) {
1546                // If config does not belong to any namespace, assume it is
1547                // a format option and apply the format prefix for backwards
1548                // compatibility.
1549                let renamed_key = format!("format.{}", key);
1550                options_map.insert(renamed_key.to_lowercase(), value_string);
1551            } else {
1552                options_map.insert(key.to_lowercase(), value_string);
1553            }
1554        }
1555
1556        Ok(options_map)
1557    }
1558
1559    /// Generate a plan for EXPLAIN ... that will print out a plan
1560    ///
1561    /// Note this is the sqlparser explain statement, not the
1562    /// datafusion `EXPLAIN` statement.
1563    fn explain_to_plan(
1564        &self,
1565        verbose: bool,
1566        analyze: bool,
1567        statement: DFStatement,
1568    ) -> Result<LogicalPlan> {
1569        let plan = self.statement_to_plan(statement)?;
1570        if matches!(plan, LogicalPlan::Explain(_)) {
1571            return plan_err!("Nested EXPLAINs are not supported");
1572        }
1573        let plan = Arc::new(plan);
1574        let schema = LogicalPlan::explain_schema();
1575        let schema = schema.to_dfschema_ref()?;
1576
1577        if analyze {
1578            Ok(LogicalPlan::Analyze(Analyze {
1579                verbose,
1580                input: plan,
1581                schema,
1582            }))
1583        } else {
1584            let stringified_plans =
1585                vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1586            Ok(LogicalPlan::Explain(Explain {
1587                verbose,
1588                plan,
1589                stringified_plans,
1590                schema,
1591                logical_optimization_succeeded: false,
1592            }))
1593        }
1594    }
1595
1596    fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1597        if !self.has_table("information_schema", "df_settings") {
1598            return plan_err!(
1599                "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1600            );
1601        }
1602
1603        let verbose = variable
1604            .last()
1605            .map(|s| ident_to_string(s) == "verbose")
1606            .unwrap_or(false);
1607        let mut variable_vec = variable.to_vec();
1608        let mut columns: String = "name, value".to_owned();
1609
1610        if verbose {
1611            columns = format!("{columns}, description");
1612            variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1613        }
1614
1615        let variable = object_name_to_string(&ObjectName(variable_vec));
1616        let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1617        let query = if variable == "all" {
1618            // Add an ORDER BY so the output comes out in a consistent order
1619            format!("{base_query} ORDER BY name")
1620        } else if variable == "timezone" || variable == "time.zone" {
1621            // we could introduce alias in OptionDefinition if this string matching thing grows
1622            format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1623        } else {
1624            // These values are what are used to make the information_schema table, so we just
1625            // check here, before actually planning or executing the query, if it would produce no
1626            // results, and error preemptively if it would (for a better UX)
1627            let is_valid_variable = self
1628                .context_provider
1629                .options()
1630                .entries()
1631                .iter()
1632                .any(|opt| opt.key == variable);
1633
1634            if !is_valid_variable {
1635                return plan_err!(
1636                    "'{variable}' is not a variable which can be viewed with 'SHOW'"
1637                );
1638            }
1639
1640            format!("{base_query} WHERE name = '{variable}'")
1641        };
1642
1643        let mut rewrite = DFParser::parse_sql(&query)?;
1644        assert_eq!(rewrite.len(), 1);
1645
1646        self.statement_to_plan(rewrite.pop_front().unwrap())
1647    }
1648
1649    fn set_variable_to_plan(
1650        &self,
1651        local: bool,
1652        hivevar: bool,
1653        variables: &OneOrManyWithParens<ObjectName>,
1654        value: Vec<SQLExpr>,
1655    ) -> Result<LogicalPlan> {
1656        if local {
1657            return not_impl_err!("LOCAL is not supported");
1658        }
1659
1660        if hivevar {
1661            return not_impl_err!("HIVEVAR is not supported");
1662        }
1663
1664        let variable = match variables {
1665            OneOrManyWithParens::One(v) => object_name_to_string(v),
1666            OneOrManyWithParens::Many(vs) => {
1667                return not_impl_err!(
1668                    "SET only supports single variable assignment: {vs:?}"
1669                );
1670            }
1671        };
1672        let mut variable_lower = variable.to_lowercase();
1673
1674        if variable_lower == "timezone" || variable_lower == "time.zone" {
1675            // We could introduce alias in OptionDefinition if this string matching thing grows
1676            variable_lower = "datafusion.execution.time_zone".to_string();
1677        }
1678
1679        // Parse value string from Expr
1680        let value_string = match &value[0] {
1681            SQLExpr::Identifier(i) => ident_to_string(i),
1682            SQLExpr::Value(v) => match crate::utils::value_to_string(v) {
1683                None => {
1684                    return plan_err!("Unsupported Value {}", value[0]);
1685                }
1686                Some(v) => v,
1687            },
1688            // For capture signed number e.g. +8, -8
1689            SQLExpr::UnaryOp { op, expr } => match op {
1690                UnaryOperator::Plus => format!("+{expr}"),
1691                UnaryOperator::Minus => format!("-{expr}"),
1692                _ => {
1693                    return plan_err!("Unsupported Value {}", value[0]);
1694                }
1695            },
1696            _ => {
1697                return plan_err!("Unsupported Value {}", value[0]);
1698            }
1699        };
1700
1701        let statement = PlanStatement::SetVariable(SetVariable {
1702            variable: variable_lower,
1703            value: value_string,
1704        });
1705
1706        Ok(LogicalPlan::Statement(statement))
1707    }
1708
1709    fn delete_to_plan(
1710        &self,
1711        table_name: ObjectName,
1712        predicate_expr: Option<SQLExpr>,
1713    ) -> Result<LogicalPlan> {
1714        // Do a table lookup to verify the table exists
1715        let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1716        let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1717        let schema = table_source.schema().to_dfschema_ref()?;
1718        let scan =
1719            LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1720                .build()?;
1721        let mut planner_context = PlannerContext::new();
1722
1723        let source = match predicate_expr {
1724            None => scan,
1725            Some(predicate_expr) => {
1726                let filter_expr =
1727                    self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1728                let schema = Arc::new(schema);
1729                let mut using_columns = HashSet::new();
1730                expr_to_columns(&filter_expr, &mut using_columns)?;
1731                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1732                    filter_expr,
1733                    &[&[&schema]],
1734                    &[using_columns],
1735                )?;
1736                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1737            }
1738        };
1739
1740        let plan = LogicalPlan::Dml(DmlStatement::new(
1741            table_ref,
1742            table_source,
1743            WriteOp::Delete,
1744            Arc::new(source),
1745        ));
1746        Ok(plan)
1747    }
1748
1749    fn update_to_plan(
1750        &self,
1751        table: TableWithJoins,
1752        assignments: Vec<Assignment>,
1753        from: Option<TableWithJoins>,
1754        predicate_expr: Option<SQLExpr>,
1755    ) -> Result<LogicalPlan> {
1756        let (table_name, table_alias) = match &table.relation {
1757            TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
1758            _ => plan_err!("Cannot update non-table relation!")?,
1759        };
1760
1761        // Do a table lookup to verify the table exists
1762        let table_name = self.object_name_to_table_reference(table_name)?;
1763        let table_source = self.context_provider.get_table_source(table_name.clone())?;
1764        let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
1765            table_name.clone(),
1766            &table_source.schema(),
1767        )?);
1768
1769        // Overwrite with assignment expressions
1770        let mut planner_context = PlannerContext::new();
1771        let mut assign_map = assignments
1772            .iter()
1773            .map(|assign| {
1774                let cols = match &assign.target {
1775                    AssignmentTarget::ColumnName(cols) => cols,
1776                    _ => plan_err!("Tuples are not supported")?,
1777                };
1778                let col_name: &Ident = cols
1779                    .0
1780                    .iter()
1781                    .last()
1782                    .ok_or_else(|| plan_datafusion_err!("Empty column id"))?;
1783                // Validate that the assignment target column exists
1784                table_schema.field_with_unqualified_name(&col_name.value)?;
1785                Ok((col_name.value.clone(), assign.value.clone()))
1786            })
1787            .collect::<Result<HashMap<String, SQLExpr>>>()?;
1788
1789        // Build scan, join with from table if it exists.
1790        let mut input_tables = vec![table];
1791        input_tables.extend(from);
1792        let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
1793
1794        // Filter
1795        let source = match predicate_expr {
1796            None => scan,
1797            Some(predicate_expr) => {
1798                let filter_expr = self.sql_to_expr(
1799                    predicate_expr,
1800                    scan.schema(),
1801                    &mut planner_context,
1802                )?;
1803                let mut using_columns = HashSet::new();
1804                expr_to_columns(&filter_expr, &mut using_columns)?;
1805                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1806                    filter_expr,
1807                    &[&[scan.schema()]],
1808                    &[using_columns],
1809                )?;
1810                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1811            }
1812        };
1813
1814        // Build updated values for each column, using the previous value if not modified
1815        let exprs = table_schema
1816            .iter()
1817            .map(|(qualifier, field)| {
1818                let expr = match assign_map.remove(field.name()) {
1819                    Some(new_value) => {
1820                        let mut expr = self.sql_to_expr(
1821                            new_value,
1822                            source.schema(),
1823                            &mut planner_context,
1824                        )?;
1825                        // Update placeholder's datatype to the type of the target column
1826                        if let Expr::Placeholder(placeholder) = &mut expr {
1827                            placeholder.data_type = placeholder
1828                                .data_type
1829                                .take()
1830                                .or_else(|| Some(field.data_type().clone()));
1831                        }
1832                        // Cast to target column type, if necessary
1833                        expr.cast_to(field.data_type(), source.schema())?
1834                    }
1835                    None => {
1836                        // If the target table has an alias, use it to qualify the column name
1837                        if let Some(alias) = &table_alias {
1838                            Expr::Column(Column::new(
1839                                Some(self.ident_normalizer.normalize(alias.name.clone())),
1840                                field.name(),
1841                            ))
1842                        } else {
1843                            Expr::Column(Column::from((qualifier, field)))
1844                        }
1845                    }
1846                };
1847                Ok(expr.alias(field.name()))
1848            })
1849            .collect::<Result<Vec<_>>>()?;
1850
1851        let source = project(source, exprs)?;
1852
1853        let plan = LogicalPlan::Dml(DmlStatement::new(
1854            table_name,
1855            table_source,
1856            WriteOp::Update,
1857            Arc::new(source),
1858        ));
1859        Ok(plan)
1860    }
1861
1862    fn insert_to_plan(
1863        &self,
1864        table_name: ObjectName,
1865        columns: Vec<Ident>,
1866        source: Box<Query>,
1867        overwrite: bool,
1868        replace_into: bool,
1869    ) -> Result<LogicalPlan> {
1870        // Do a table lookup to verify the table exists
1871        let table_name = self.object_name_to_table_reference(table_name)?;
1872        let table_source = self.context_provider.get_table_source(table_name.clone())?;
1873        let arrow_schema = (*table_source.schema()).clone();
1874        let table_schema = DFSchema::try_from(arrow_schema)?;
1875
1876        // Get insert fields and target table's value indices
1877        //
1878        // If value_indices[i] = Some(j), it means that the value of the i-th target table's column is
1879        // derived from the j-th output of the source.
1880        //
1881        // If value_indices[i] = None, it means that the value of the i-th target table's column is
1882        // not provided, and should be filled with a default value later.
1883        let (fields, value_indices) = if columns.is_empty() {
1884            // Empty means we're inserting into all columns of the table
1885            (
1886                table_schema.fields().clone(),
1887                (0..table_schema.fields().len())
1888                    .map(Some)
1889                    .collect::<Vec<_>>(),
1890            )
1891        } else {
1892            let mut value_indices = vec![None; table_schema.fields().len()];
1893            let fields = columns
1894                .into_iter()
1895                .map(|c| self.ident_normalizer.normalize(c))
1896                .enumerate()
1897                .map(|(i, c)| {
1898                    let column_index = table_schema
1899                        .index_of_column_by_name(None, &c)
1900                        .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
1901
1902                    if value_indices[column_index].is_some() {
1903                        return schema_err!(SchemaError::DuplicateUnqualifiedField {
1904                            name: c,
1905                        });
1906                    } else {
1907                        value_indices[column_index] = Some(i);
1908                    }
1909                    Ok(table_schema.field(column_index).clone())
1910                })
1911                .collect::<Result<Vec<_>>>()?;
1912            (Fields::from(fields), value_indices)
1913        };
1914
1915        // infer types for Values clause... other types should be resolvable the regular way
1916        let mut prepare_param_data_types = BTreeMap::new();
1917        if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
1918            for row in rows.iter() {
1919                for (idx, val) in row.iter().enumerate() {
1920                    if let SQLExpr::Value(Value::Placeholder(name)) = val {
1921                        let name =
1922                            name.replace('$', "").parse::<usize>().map_err(|_| {
1923                                plan_datafusion_err!("Can't parse placeholder: {name}")
1924                            })? - 1;
1925                        let field = fields.get(idx).ok_or_else(|| {
1926                            plan_datafusion_err!(
1927                                "Placeholder ${} refers to a non existent column",
1928                                idx + 1
1929                            )
1930                        })?;
1931                        let dt = field.data_type().clone();
1932                        let _ = prepare_param_data_types.insert(name, dt);
1933                    }
1934                }
1935            }
1936        }
1937        let prepare_param_data_types = prepare_param_data_types.into_values().collect();
1938
1939        // Projection
1940        let mut planner_context =
1941            PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
1942        planner_context.set_table_schema(Some(DFSchemaRef::new(
1943            DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
1944        )));
1945        let source = self.query_to_plan(*source, &mut planner_context)?;
1946        if fields.len() != source.schema().fields().len() {
1947            plan_err!("Column count doesn't match insert query!")?;
1948        }
1949
1950        let exprs = value_indices
1951            .into_iter()
1952            .enumerate()
1953            .map(|(i, value_index)| {
1954                let target_field = table_schema.field(i);
1955                let expr = match value_index {
1956                    Some(v) => {
1957                        Expr::Column(Column::from(source.schema().qualified_field(v)))
1958                            .cast_to(target_field.data_type(), source.schema())?
1959                    }
1960                    // The value is not specified. Fill in the default value for the column.
1961                    None => table_source
1962                        .get_column_default(target_field.name())
1963                        .cloned()
1964                        .unwrap_or_else(|| {
1965                            // If there is no default for the column, then the default is NULL
1966                            Expr::Literal(ScalarValue::Null)
1967                        })
1968                        .cast_to(target_field.data_type(), &DFSchema::empty())?,
1969                };
1970                Ok(expr.alias(target_field.name()))
1971            })
1972            .collect::<Result<Vec<Expr>>>()?;
1973        let source = project(source, exprs)?;
1974
1975        let insert_op = match (overwrite, replace_into) {
1976            (false, false) => InsertOp::Append,
1977            (true, false) => InsertOp::Overwrite,
1978            (false, true) => InsertOp::Replace,
1979            (true, true) => plan_err!("Conflicting insert operations: `overwrite` and `replace_into` cannot both be true")?,
1980        };
1981
1982        let plan = LogicalPlan::Dml(DmlStatement::new(
1983            table_name,
1984            Arc::clone(&table_source),
1985            WriteOp::Insert(insert_op),
1986            Arc::new(source),
1987        ));
1988        Ok(plan)
1989    }
1990
1991    fn show_columns_to_plan(
1992        &self,
1993        extended: bool,
1994        full: bool,
1995        sql_table_name: ObjectName,
1996    ) -> Result<LogicalPlan> {
1997        // Figure out the where clause
1998        let where_clause = object_name_to_qualifier(
1999            &sql_table_name,
2000            self.options.enable_ident_normalization,
2001        );
2002
2003        if !self.has_table("information_schema", "columns") {
2004            return plan_err!(
2005                "SHOW COLUMNS is not supported unless information_schema is enabled"
2006            );
2007        }
2008
2009        // Do a table lookup to verify the table exists
2010        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2011        let _ = self.context_provider.get_table_source(table_ref)?;
2012
2013        // Treat both FULL and EXTENDED as the same
2014        let select_list = if full || extended {
2015            "*"
2016        } else {
2017            "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2018        };
2019
2020        let query = format!(
2021            "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2022        );
2023
2024        let mut rewrite = DFParser::parse_sql(&query)?;
2025        assert_eq!(rewrite.len(), 1);
2026        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2027    }
2028
2029    /// Rewrite `SHOW FUNCTIONS` to another SQL query
2030    /// The query is based on the `information_schema.routines` and `information_schema.parameters` tables
2031    ///
2032    /// The output columns:
2033    /// - function_name: The name of function
2034    /// - return_type: The return type of the function
2035    /// - parameters: The name of parameters (ordered by the ordinal position)
2036    /// - parameter_types: The type of parameters (ordered by the ordinal position)
2037    /// - description: The description of the function (the description defined in the document)
2038    /// - syntax_example: The syntax_example of the function (the syntax_example defined in the document)
2039    fn show_functions_to_plan(
2040        &self,
2041        filter: Option<ShowStatementFilter>,
2042    ) -> Result<LogicalPlan> {
2043        let where_clause = if let Some(filter) = filter {
2044            match filter {
2045                ShowStatementFilter::Like(like) => {
2046                    format!("WHERE p.function_name like '{like}'")
2047                }
2048                _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2049            }
2050        } else {
2051            "".to_string()
2052        };
2053
2054        let query = format!(
2055            r#"
2056SELECT DISTINCT
2057    p.*,
2058    r.function_type function_type,
2059    r.description description,
2060    r.syntax_example syntax_example
2061FROM
2062    (
2063        SELECT
2064            i.specific_name function_name,
2065            o.data_type return_type,
2066            array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2067            array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2068        FROM (
2069                 SELECT
2070                     specific_catalog,
2071                     specific_schema,
2072                     specific_name,
2073                     ordinal_position,
2074                     parameter_name,
2075                     data_type,
2076                     rid
2077                 FROM
2078                     information_schema.parameters
2079                 WHERE
2080                     parameter_mode = 'IN'
2081             ) i
2082                 JOIN
2083             (
2084                 SELECT
2085                     specific_catalog,
2086                     specific_schema,
2087                     specific_name,
2088                     ordinal_position,
2089                     parameter_name,
2090                     data_type,
2091                     rid
2092                 FROM
2093                     information_schema.parameters
2094                 WHERE
2095                     parameter_mode = 'OUT'
2096             ) o
2097             ON i.specific_catalog = o.specific_catalog
2098                 AND i.specific_schema = o.specific_schema
2099                 AND i.specific_name = o.specific_name
2100                 AND i.rid = o.rid
2101        GROUP BY 1, 2, i.rid
2102    ) as p
2103JOIN information_schema.routines r
2104ON p.function_name = r.routine_name
2105{where_clause}
2106            "#
2107        );
2108        let mut rewrite = DFParser::parse_sql(&query)?;
2109        assert_eq!(rewrite.len(), 1);
2110        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2111    }
2112
2113    fn show_create_table_to_plan(
2114        &self,
2115        sql_table_name: ObjectName,
2116    ) -> Result<LogicalPlan> {
2117        if !self.has_table("information_schema", "tables") {
2118            return plan_err!(
2119                "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2120            );
2121        }
2122        // Figure out the where clause
2123        let where_clause = object_name_to_qualifier(
2124            &sql_table_name,
2125            self.options.enable_ident_normalization,
2126        );
2127
2128        // Do a table lookup to verify the table exists
2129        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2130        let _ = self.context_provider.get_table_source(table_ref)?;
2131
2132        let query = format!(
2133            "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2134        );
2135
2136        let mut rewrite = DFParser::parse_sql(&query)?;
2137        assert_eq!(rewrite.len(), 1);
2138        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2139    }
2140
2141    /// Return true if there is a table provider available for "schema.table"
2142    fn has_table(&self, schema: &str, table: &str) -> bool {
2143        let tables_reference = TableReference::Partial {
2144            schema: schema.into(),
2145            table: table.into(),
2146        };
2147        self.context_provider
2148            .get_table_source(tables_reference)
2149            .is_ok()
2150    }
2151
2152    fn validate_transaction_kind(
2153        &self,
2154        kind: Option<BeginTransactionKind>,
2155    ) -> Result<()> {
2156        match kind {
2157            // BEGIN
2158            None => Ok(()),
2159            // BEGIN TRANSACTION
2160            Some(BeginTransactionKind::Transaction) => Ok(()),
2161            Some(BeginTransactionKind::Work) => {
2162                not_impl_err!("Transaction kind not supported: {kind:?}")
2163            }
2164        }
2165    }
2166}