1use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24 CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25 LexOrdering, Statement as DFStatement,
26};
27use crate::planner::{
28 object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{DataType, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36 exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
37 unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
38 DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
39 ToDFSchema,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::builder::project;
44use datafusion_expr::logical_plan::DdlStatement;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47 cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
48 CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49 CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50 DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51 EmptyRelation, Execute, Explain, Expr, ExprSchemable, Filter, LogicalPlan,
52 LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable, SortExpr,
53 Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
54 TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
55 Volatility, WriteOp,
56};
57use sqlparser::ast::{
58 self, BeginTransactionKind, NullsDistinctOption, ShowStatementIn,
59 ShowStatementOptions, SqliteOnConflict, TableObject, UpdateTableFromKind,
60};
61use sqlparser::ast::{
62 Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
63 CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
64 ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr,
65 ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor,
66 TableWithJoins, TransactionMode, UnaryOperator, Value,
67};
68use sqlparser::parser::ParserError::ParserError;
69
70fn ident_to_string(ident: &Ident) -> String {
71 normalize_ident(ident.to_owned())
72}
73
74fn object_name_to_string(object_name: &ObjectName) -> String {
75 object_name
76 .0
77 .iter()
78 .map(ident_to_string)
79 .collect::<Vec<String>>()
80 .join(".")
81}
82
83fn get_schema_name(schema_name: &SchemaName) -> String {
84 match schema_name {
85 SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
86 SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
87 SchemaName::NamedAuthorization(schema_name, auth) => format!(
88 "{}.{}",
89 object_name_to_string(schema_name),
90 ident_to_string(auth)
91 ),
92 }
93}
94
95fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
98 let mut constraints = vec![];
99 for column in columns {
100 for ast::ColumnOptionDef { name, option } in &column.options {
101 match option {
102 ast::ColumnOption::Unique {
103 is_primary: false,
104 characteristics,
105 } => constraints.push(TableConstraint::Unique {
106 name: name.clone(),
107 columns: vec![column.name.clone()],
108 characteristics: *characteristics,
109 index_name: None,
110 index_type_display: ast::KeyOrIndexDisplay::None,
111 index_type: None,
112 index_options: vec![],
113 nulls_distinct: NullsDistinctOption::None,
114 }),
115 ast::ColumnOption::Unique {
116 is_primary: true,
117 characteristics,
118 } => constraints.push(TableConstraint::PrimaryKey {
119 name: name.clone(),
120 columns: vec![column.name.clone()],
121 characteristics: *characteristics,
122 index_name: None,
123 index_type: None,
124 index_options: vec![],
125 }),
126 ast::ColumnOption::ForeignKey {
127 foreign_table,
128 referred_columns,
129 on_delete,
130 on_update,
131 characteristics,
132 } => constraints.push(TableConstraint::ForeignKey {
133 name: name.clone(),
134 columns: vec![],
135 foreign_table: foreign_table.clone(),
136 referred_columns: referred_columns.to_vec(),
137 on_delete: *on_delete,
138 on_update: *on_update,
139 characteristics: *characteristics,
140 }),
141 ast::ColumnOption::Check(expr) => {
142 constraints.push(TableConstraint::Check {
143 name: name.clone(),
144 expr: Box::new(expr.clone()),
145 })
146 }
147 ast::ColumnOption::Default(_)
149 | ast::ColumnOption::Null
150 | ast::ColumnOption::NotNull
151 | ast::ColumnOption::DialectSpecific(_)
152 | ast::ColumnOption::CharacterSet(_)
153 | ast::ColumnOption::Generated { .. }
154 | ast::ColumnOption::Comment(_)
155 | ast::ColumnOption::Options(_)
156 | ast::ColumnOption::OnUpdate(_)
157 | ast::ColumnOption::Materialized(_)
158 | ast::ColumnOption::Ephemeral(_)
159 | ast::ColumnOption::Identity(_)
160 | ast::ColumnOption::OnConflict(_)
161 | ast::ColumnOption::Policy(_)
162 | ast::ColumnOption::Tags(_)
163 | ast::ColumnOption::Alias(_) => {}
164 }
165 }
166 }
167 constraints
168}
169
170impl<S: ContextProvider> SqlToRel<'_, S> {
171 pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
173 match statement {
174 DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
175 DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
176 DFStatement::CopyTo(s) => self.copy_to_plan(s),
177 DFStatement::Explain(ExplainStatement {
178 verbose,
179 analyze,
180 statement,
181 }) => self.explain_to_plan(verbose, analyze, *statement),
182 }
183 }
184
185 pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
187 self.sql_statement_to_plan_with_context_impl(
188 statement,
189 &mut PlannerContext::new(),
190 )
191 }
192
193 pub fn sql_statement_to_plan_with_context(
195 &self,
196 statement: Statement,
197 planner_context: &mut PlannerContext,
198 ) -> Result<LogicalPlan> {
199 self.sql_statement_to_plan_with_context_impl(statement, planner_context)
200 }
201
202 fn sql_statement_to_plan_with_context_impl(
203 &self,
204 statement: Statement,
205 planner_context: &mut PlannerContext,
206 ) -> Result<LogicalPlan> {
207 match statement {
208 Statement::ExplainTable {
209 describe_alias: DescribeAlias::Describe, table_name,
211 ..
212 } => self.describe_table_to_plan(table_name),
213 Statement::Explain {
214 verbose,
215 statement,
216 analyze,
217 format: _,
218 describe_alias: _,
219 ..
220 } => {
221 self.explain_to_plan(verbose, analyze, DFStatement::Statement(statement))
222 }
223 Statement::Query(query) => self.query_to_plan(*query, planner_context),
224 Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
225 Statement::SetVariable {
226 local,
227 hivevar,
228 variables,
229 value,
230 } => self.set_variable_to_plan(local, hivevar, &variables, value),
231
232 Statement::CreateTable(CreateTable {
233 temporary,
234 external,
235 global,
236 transient,
237 volatile,
238 hive_distribution,
239 hive_formats,
240 file_format,
241 location,
242 query,
243 name,
244 columns,
245 constraints,
246 table_properties,
247 with_options,
248 if_not_exists,
249 or_replace,
250 without_rowid,
251 like,
252 clone,
253 engine,
254 comment,
255 auto_increment_offset,
256 default_charset,
257 collation,
258 on_commit,
259 on_cluster,
260 primary_key,
261 order_by,
262 partition_by,
263 cluster_by,
264 clustered_by,
265 options,
266 strict,
267 copy_grants,
268 enable_schema_evolution,
269 change_tracking,
270 data_retention_time_in_days,
271 max_data_extension_time_in_days,
272 default_ddl_collation,
273 with_aggregation_policy,
274 with_row_access_policy,
275 with_tags,
276 }) if table_properties.is_empty() && with_options.is_empty() => {
277 if temporary {
278 return not_impl_err!("Temporary tables not supported")?;
279 }
280 if external {
281 return not_impl_err!("External tables not supported")?;
282 }
283 if global.is_some() {
284 return not_impl_err!("Global tables not supported")?;
285 }
286 if transient {
287 return not_impl_err!("Transient tables not supported")?;
288 }
289 if volatile {
290 return not_impl_err!("Volatile tables not supported")?;
291 }
292 if hive_distribution != ast::HiveDistributionStyle::NONE {
293 return not_impl_err!(
294 "Hive distribution not supported: {hive_distribution:?}"
295 )?;
296 }
297 if !matches!(
298 hive_formats,
299 Some(ast::HiveFormat {
300 row_format: None,
301 serde_properties: None,
302 storage: None,
303 location: None,
304 })
305 ) {
306 return not_impl_err!(
307 "Hive formats not supported: {hive_formats:?}"
308 )?;
309 }
310 if file_format.is_some() {
311 return not_impl_err!("File format not supported")?;
312 }
313 if location.is_some() {
314 return not_impl_err!("Location not supported")?;
315 }
316 if without_rowid {
317 return not_impl_err!("Without rowid not supported")?;
318 }
319 if like.is_some() {
320 return not_impl_err!("Like not supported")?;
321 }
322 if clone.is_some() {
323 return not_impl_err!("Clone not supported")?;
324 }
325 if engine.is_some() {
326 return not_impl_err!("Engine not supported")?;
327 }
328 if comment.is_some() {
329 return not_impl_err!("Comment not supported")?;
330 }
331 if auto_increment_offset.is_some() {
332 return not_impl_err!("Auto increment offset not supported")?;
333 }
334 if default_charset.is_some() {
335 return not_impl_err!("Default charset not supported")?;
336 }
337 if collation.is_some() {
338 return not_impl_err!("Collation not supported")?;
339 }
340 if on_commit.is_some() {
341 return not_impl_err!("On commit not supported")?;
342 }
343 if on_cluster.is_some() {
344 return not_impl_err!("On cluster not supported")?;
345 }
346 if primary_key.is_some() {
347 return not_impl_err!("Primary key not supported")?;
348 }
349 if order_by.is_some() {
350 return not_impl_err!("Order by not supported")?;
351 }
352 if partition_by.is_some() {
353 return not_impl_err!("Partition by not supported")?;
354 }
355 if cluster_by.is_some() {
356 return not_impl_err!("Cluster by not supported")?;
357 }
358 if clustered_by.is_some() {
359 return not_impl_err!("Clustered by not supported")?;
360 }
361 if options.is_some() {
362 return not_impl_err!("Options not supported")?;
363 }
364 if strict {
365 return not_impl_err!("Strict not supported")?;
366 }
367 if copy_grants {
368 return not_impl_err!("Copy grants not supported")?;
369 }
370 if enable_schema_evolution.is_some() {
371 return not_impl_err!("Enable schema evolution not supported")?;
372 }
373 if change_tracking.is_some() {
374 return not_impl_err!("Change tracking not supported")?;
375 }
376 if data_retention_time_in_days.is_some() {
377 return not_impl_err!("Data retention time in days not supported")?;
378 }
379 if max_data_extension_time_in_days.is_some() {
380 return not_impl_err!(
381 "Max data extension time in days not supported"
382 )?;
383 }
384 if default_ddl_collation.is_some() {
385 return not_impl_err!("Default DDL collation not supported")?;
386 }
387 if with_aggregation_policy.is_some() {
388 return not_impl_err!("With aggregation policy not supported")?;
389 }
390 if with_row_access_policy.is_some() {
391 return not_impl_err!("With row access policy not supported")?;
392 }
393 if with_tags.is_some() {
394 return not_impl_err!("With tags not supported")?;
395 }
396
397 let mut all_constraints = constraints;
399 let inline_constraints = calc_inline_constraints_from_columns(&columns);
400 all_constraints.extend(inline_constraints);
401 let column_defaults =
403 self.build_column_defaults(&columns, planner_context)?;
404
405 let has_columns = !columns.is_empty();
406 let schema = self.build_schema(columns)?.to_dfschema_ref()?;
407 if has_columns {
408 planner_context.set_table_schema(Some(Arc::clone(&schema)));
409 }
410
411 match query {
412 Some(query) => {
413 let plan = self.query_to_plan(*query, planner_context)?;
414 let input_schema = plan.schema();
415
416 let plan = if has_columns {
417 if schema.fields().len() != input_schema.fields().len() {
418 return plan_err!(
419 "Mismatch: {} columns specified, but result has {} columns",
420 schema.fields().len(),
421 input_schema.fields().len()
422 );
423 }
424 let input_fields = input_schema.fields();
425 let project_exprs = schema
426 .fields()
427 .iter()
428 .zip(input_fields)
429 .map(|(field, input_field)| {
430 cast(
431 col(input_field.name()),
432 field.data_type().clone(),
433 )
434 .alias(field.name())
435 })
436 .collect::<Vec<_>>();
437 LogicalPlanBuilder::from(plan.clone())
438 .project(project_exprs)?
439 .build()?
440 } else {
441 plan
442 };
443
444 let constraints = self.new_constraint_from_table_constraints(
445 &all_constraints,
446 plan.schema(),
447 )?;
448
449 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
450 CreateMemoryTable {
451 name: self.object_name_to_table_reference(name)?,
452 constraints,
453 input: Arc::new(plan),
454 if_not_exists,
455 or_replace,
456 column_defaults,
457 temporary,
458 },
459 )))
460 }
461
462 None => {
463 let plan = EmptyRelation {
464 produce_one_row: false,
465 schema,
466 };
467 let plan = LogicalPlan::EmptyRelation(plan);
468 let constraints = self.new_constraint_from_table_constraints(
469 &all_constraints,
470 plan.schema(),
471 )?;
472 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
473 CreateMemoryTable {
474 name: self.object_name_to_table_reference(name)?,
475 constraints,
476 input: Arc::new(plan),
477 if_not_exists,
478 or_replace,
479 column_defaults,
480 temporary,
481 },
482 )))
483 }
484 }
485 }
486
487 Statement::CreateView {
488 or_replace,
489 materialized,
490 name,
491 columns,
492 query,
493 options: CreateTableOptions::None,
494 cluster_by,
495 comment,
496 with_no_schema_binding,
497 if_not_exists,
498 temporary,
499 to,
500 params,
501 } => {
502 if materialized {
503 return not_impl_err!("Materialized views not supported")?;
504 }
505 if !cluster_by.is_empty() {
506 return not_impl_err!("Cluster by not supported")?;
507 }
508 if comment.is_some() {
509 return not_impl_err!("Comment not supported")?;
510 }
511 if with_no_schema_binding {
512 return not_impl_err!("With no schema binding not supported")?;
513 }
514 if if_not_exists {
515 return not_impl_err!("If not exists not supported")?;
516 }
517 if to.is_some() {
518 return not_impl_err!("To not supported")?;
519 }
520
521 let stmt = Statement::CreateView {
524 or_replace,
525 materialized,
526 name,
527 columns,
528 query,
529 options: CreateTableOptions::None,
530 cluster_by,
531 comment,
532 with_no_schema_binding,
533 if_not_exists,
534 temporary,
535 to,
536 params,
537 };
538 let sql = stmt.to_string();
539 let Statement::CreateView {
540 name,
541 columns,
542 query,
543 or_replace,
544 temporary,
545 ..
546 } = stmt
547 else {
548 return internal_err!("Unreachable code in create view");
549 };
550
551 let columns = columns
552 .into_iter()
553 .map(|view_column_def| {
554 if let Some(options) = view_column_def.options {
555 plan_err!(
556 "Options not supported for view columns: {options:?}"
557 )
558 } else {
559 Ok(view_column_def.name)
560 }
561 })
562 .collect::<Result<Vec<_>>>()?;
563
564 let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
565 plan = self.apply_expr_alias(plan, columns)?;
566
567 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
568 name: self.object_name_to_table_reference(name)?,
569 input: Arc::new(plan),
570 or_replace,
571 definition: Some(sql),
572 temporary,
573 })))
574 }
575 Statement::ShowCreate { obj_type, obj_name } => match obj_type {
576 ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
577 _ => {
578 not_impl_err!("Only `SHOW CREATE TABLE ...` statement is supported")
579 }
580 },
581 Statement::CreateSchema {
582 schema_name,
583 if_not_exists,
584 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
585 CreateCatalogSchema {
586 schema_name: get_schema_name(&schema_name),
587 if_not_exists,
588 schema: Arc::new(DFSchema::empty()),
589 },
590 ))),
591 Statement::CreateDatabase {
592 db_name,
593 if_not_exists,
594 ..
595 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
596 CreateCatalog {
597 catalog_name: object_name_to_string(&db_name),
598 if_not_exists,
599 schema: Arc::new(DFSchema::empty()),
600 },
601 ))),
602 Statement::Drop {
603 object_type,
604 if_exists,
605 mut names,
606 cascade,
607 restrict: _,
608 purge: _,
609 temporary: _,
610 } => {
611 let name = match names.len() {
614 0 => Err(ParserError("Missing table name.".to_string()).into()),
615 1 => self.object_name_to_table_reference(names.pop().unwrap()),
616 _ => {
617 Err(ParserError("Multiple objects not supported".to_string())
618 .into())
619 }
620 }?;
621
622 match object_type {
623 ObjectType::Table => {
624 Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
625 name,
626 if_exists,
627 schema: DFSchemaRef::new(DFSchema::empty()),
628 })))
629 }
630 ObjectType::View => {
631 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
632 name,
633 if_exists,
634 schema: DFSchemaRef::new(DFSchema::empty()),
635 })))
636 }
637 ObjectType::Schema => {
638 let name = match name {
639 TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
640 TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
641 TableReference::Full { catalog: _, schema: _, table: _ } => {
642 Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
643 }
644 }?;
645 Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
646 name,
647 if_exists,
648 cascade,
649 schema: DFSchemaRef::new(DFSchema::empty()),
650 })))
651 }
652 _ => not_impl_err!(
653 "Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
654 ),
655 }
656 }
657 Statement::Prepare {
658 name,
659 data_types,
660 statement,
661 } => {
662 let data_types: Vec<DataType> = data_types
664 .into_iter()
665 .map(|t| self.convert_data_type(&t))
666 .collect::<Result<_>>()?;
667
668 let mut planner_context = PlannerContext::new()
670 .with_prepare_param_data_types(data_types.clone());
671
672 let plan = self.sql_statement_to_plan_with_context_impl(
674 *statement,
675 &mut planner_context,
676 )?;
677 Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
678 name: ident_to_string(&name),
679 data_types,
680 input: Arc::new(plan),
681 })))
682 }
683 Statement::Execute {
684 name,
685 parameters,
686 using,
687 has_parentheses: _,
690 } => {
691 if !using.is_empty() {
693 return not_impl_err!(
694 "Execute statement with USING is not supported"
695 );
696 }
697
698 let empty_schema = DFSchema::empty();
699 let parameters = parameters
700 .into_iter()
701 .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
702 .collect::<Result<Vec<Expr>>>()?;
703
704 Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
705 name: object_name_to_string(&name),
706 parameters,
707 })))
708 }
709 Statement::Deallocate {
710 name,
711 prepare: _,
713 } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
714 Deallocate {
715 name: ident_to_string(&name),
716 },
717 ))),
718
719 Statement::ShowTables {
720 extended,
721 full,
722 terse,
723 history,
724 external,
725 show_options,
726 } => {
727 if extended {
730 return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
731 }
732 if full {
733 return not_impl_err!("SHOW FULL TABLES not supported")?;
734 }
735 if terse {
736 return not_impl_err!("SHOW TERSE TABLES not supported")?;
737 }
738 if history {
739 return not_impl_err!("SHOW TABLES HISTORY not supported")?;
740 }
741 if external {
742 return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
743 }
744 let ShowStatementOptions {
745 show_in,
746 starts_with,
747 limit,
748 limit_from,
749 filter_position,
750 } = show_options;
751 if show_in.is_some() {
752 return not_impl_err!("SHOW TABLES IN not supported")?;
753 }
754 if starts_with.is_some() {
755 return not_impl_err!("SHOW TABLES LIKE not supported")?;
756 }
757 if limit.is_some() {
758 return not_impl_err!("SHOW TABLES LIMIT not supported")?;
759 }
760 if limit_from.is_some() {
761 return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
762 }
763 if filter_position.is_some() {
764 return not_impl_err!("SHOW TABLES FILTER not supported")?;
765 }
766 self.show_tables_to_plan()
767 }
768
769 Statement::ShowColumns {
770 extended,
771 full,
772 show_options,
773 } => {
774 let ShowStatementOptions {
775 show_in,
776 starts_with,
777 limit,
778 limit_from,
779 filter_position,
780 } = show_options;
781 if starts_with.is_some() {
782 return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
783 }
784 if limit.is_some() {
785 return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
786 }
787 if limit_from.is_some() {
788 return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
789 }
790 if filter_position.is_some() {
791 return not_impl_err!(
792 "SHOW COLUMNS with WHERE or LIKE is not supported"
793 )?;
794 }
795 let Some(ShowStatementIn {
796 clause: _,
799 parent_type,
800 parent_name,
801 }) = show_in
802 else {
803 return plan_err!("SHOW COLUMNS requires a table name");
804 };
805
806 if let Some(parent_type) = parent_type {
807 return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
808 }
809 let Some(table_name) = parent_name else {
810 return plan_err!("SHOW COLUMNS requires a table name");
811 };
812
813 self.show_columns_to_plan(extended, full, table_name)
814 }
815
816 Statement::ShowFunctions { filter, .. } => {
817 self.show_functions_to_plan(filter)
818 }
819
820 Statement::Insert(Insert {
821 or,
822 into,
823 columns,
824 overwrite,
825 source,
826 partitioned,
827 after_columns,
828 table,
829 on,
830 returning,
831 ignore,
832 table_alias,
833 mut replace_into,
834 priority,
835 insert_alias,
836 assignments,
837 has_table_keyword,
838 settings,
839 format_clause,
840 }) => {
841 let table_name = match table {
842 TableObject::TableName(table_name) => table_name,
843 TableObject::TableFunction(_) => {
844 return not_impl_err!("INSERT INTO Table functions not supported")
845 }
846 };
847 if let Some(or) = or {
848 match or {
849 SqliteOnConflict::Replace => replace_into = true,
850 _ => plan_err!("Inserts with {or} clause is not supported")?,
851 }
852 }
853 if partitioned.is_some() {
854 plan_err!("Partitioned inserts not yet supported")?;
855 }
856 if !after_columns.is_empty() {
857 plan_err!("After-columns clause not supported")?;
858 }
859 if on.is_some() {
860 plan_err!("Insert-on clause not supported")?;
861 }
862 if returning.is_some() {
863 plan_err!("Insert-returning clause not supported")?;
864 }
865 if ignore {
866 plan_err!("Insert-ignore clause not supported")?;
867 }
868 let Some(source) = source else {
869 plan_err!("Inserts without a source not supported")?
870 };
871 if let Some(table_alias) = table_alias {
872 plan_err!(
873 "Inserts with a table alias not supported: {table_alias:?}"
874 )?
875 };
876 if let Some(priority) = priority {
877 plan_err!(
878 "Inserts with a `PRIORITY` clause not supported: {priority:?}"
879 )?
880 };
881 if insert_alias.is_some() {
882 plan_err!("Inserts with an alias not supported")?;
883 }
884 if !assignments.is_empty() {
885 plan_err!("Inserts with assignments not supported")?;
886 }
887 if settings.is_some() {
888 plan_err!("Inserts with settings not supported")?;
889 }
890 if format_clause.is_some() {
891 plan_err!("Inserts with format clause not supported")?;
892 }
893 let _ = into;
895 let _ = has_table_keyword;
896 self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
897 }
898 Statement::Update {
899 table,
900 assignments,
901 from,
902 selection,
903 returning,
904 or,
905 } => {
906 let from =
907 from.map(|update_table_from_kind| match update_table_from_kind {
908 UpdateTableFromKind::BeforeSet(from) => from,
909 UpdateTableFromKind::AfterSet(from) => from,
910 });
911 if returning.is_some() {
912 plan_err!("Update-returning clause not yet supported")?;
913 }
914 if or.is_some() {
915 plan_err!("ON conflict not supported")?;
916 }
917 self.update_to_plan(table, assignments, from, selection)
918 }
919
920 Statement::Delete(Delete {
921 tables,
922 using,
923 selection,
924 returning,
925 from,
926 order_by,
927 limit,
928 }) => {
929 if !tables.is_empty() {
930 plan_err!("DELETE <TABLE> not supported")?;
931 }
932
933 if using.is_some() {
934 plan_err!("Using clause not supported")?;
935 }
936
937 if returning.is_some() {
938 plan_err!("Delete-returning clause not yet supported")?;
939 }
940
941 if !order_by.is_empty() {
942 plan_err!("Delete-order-by clause not yet supported")?;
943 }
944
945 if limit.is_some() {
946 plan_err!("Delete-limit clause not yet supported")?;
947 }
948
949 let table_name = self.get_delete_target(from)?;
950 self.delete_to_plan(table_name, selection)
951 }
952
953 Statement::StartTransaction {
954 modes,
955 begin: false,
956 modifier,
957 transaction,
958 } => {
959 if let Some(modifier) = modifier {
960 return not_impl_err!(
961 "Transaction modifier not supported: {modifier}"
962 );
963 }
964 self.validate_transaction_kind(transaction)?;
965 let isolation_level: ast::TransactionIsolationLevel = modes
966 .iter()
967 .filter_map(|m: &TransactionMode| match m {
968 TransactionMode::AccessMode(_) => None,
969 TransactionMode::IsolationLevel(level) => Some(level),
970 })
971 .last()
972 .copied()
973 .unwrap_or(ast::TransactionIsolationLevel::Serializable);
974 let access_mode: ast::TransactionAccessMode = modes
975 .iter()
976 .filter_map(|m: &TransactionMode| match m {
977 TransactionMode::AccessMode(mode) => Some(mode),
978 TransactionMode::IsolationLevel(_) => None,
979 })
980 .last()
981 .copied()
982 .unwrap_or(ast::TransactionAccessMode::ReadWrite);
983 let isolation_level = match isolation_level {
984 ast::TransactionIsolationLevel::ReadUncommitted => {
985 TransactionIsolationLevel::ReadUncommitted
986 }
987 ast::TransactionIsolationLevel::ReadCommitted => {
988 TransactionIsolationLevel::ReadCommitted
989 }
990 ast::TransactionIsolationLevel::RepeatableRead => {
991 TransactionIsolationLevel::RepeatableRead
992 }
993 ast::TransactionIsolationLevel::Serializable => {
994 TransactionIsolationLevel::Serializable
995 }
996 ast::TransactionIsolationLevel::Snapshot => {
997 TransactionIsolationLevel::Snapshot
998 }
999 };
1000 let access_mode = match access_mode {
1001 ast::TransactionAccessMode::ReadOnly => {
1002 TransactionAccessMode::ReadOnly
1003 }
1004 ast::TransactionAccessMode::ReadWrite => {
1005 TransactionAccessMode::ReadWrite
1006 }
1007 };
1008 let statement = PlanStatement::TransactionStart(TransactionStart {
1009 access_mode,
1010 isolation_level,
1011 });
1012 Ok(LogicalPlan::Statement(statement))
1013 }
1014 Statement::Commit {
1015 chain,
1016 end,
1017 modifier,
1018 } => {
1019 if end {
1020 return not_impl_err!("COMMIT AND END not supported");
1021 };
1022 if let Some(modifier) = modifier {
1023 return not_impl_err!("COMMIT {modifier} not supported");
1024 };
1025 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1026 conclusion: TransactionConclusion::Commit,
1027 chain,
1028 });
1029 Ok(LogicalPlan::Statement(statement))
1030 }
1031 Statement::Rollback { chain, savepoint } => {
1032 if savepoint.is_some() {
1033 plan_err!("Savepoints not supported")?;
1034 }
1035 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1036 conclusion: TransactionConclusion::Rollback,
1037 chain,
1038 });
1039 Ok(LogicalPlan::Statement(statement))
1040 }
1041 Statement::CreateFunction(ast::CreateFunction {
1042 or_replace,
1043 temporary,
1044 name,
1045 args,
1046 return_type,
1047 function_body,
1048 behavior,
1049 language,
1050 ..
1051 }) => {
1052 let return_type = match return_type {
1053 Some(t) => Some(self.convert_data_type(&t)?),
1054 None => None,
1055 };
1056 let mut planner_context = PlannerContext::new();
1057 let empty_schema = &DFSchema::empty();
1058
1059 let args = match args {
1060 Some(function_args) => {
1061 let function_args = function_args
1062 .into_iter()
1063 .map(|arg| {
1064 let data_type = self.convert_data_type(&arg.data_type)?;
1065
1066 let default_expr = match arg.default_expr {
1067 Some(expr) => Some(self.sql_to_expr(
1068 expr,
1069 empty_schema,
1070 &mut planner_context,
1071 )?),
1072 None => None,
1073 };
1074 Ok(OperateFunctionArg {
1075 name: arg.name,
1076 default_expr,
1077 data_type,
1078 })
1079 })
1080 .collect::<Result<Vec<OperateFunctionArg>>>();
1081 Some(function_args?)
1082 }
1083 None => None,
1084 };
1085 let name = match &name.0[..] {
1087 [] => exec_err!("Function should have name")?,
1088 [n] => n.value.clone(),
1089 [..] => not_impl_err!("Qualified functions are not supported")?,
1090 };
1091 let arg_types = args.as_ref().map(|arg| {
1095 arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
1096 });
1097 let mut planner_context = PlannerContext::new()
1098 .with_prepare_param_data_types(arg_types.unwrap_or_default());
1099
1100 let function_body = match function_body {
1101 Some(r) => Some(self.sql_to_expr(
1102 match r {
1103 ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1104 ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1105 ast::CreateFunctionBody::Return(expr) => expr,
1106 },
1107 &DFSchema::empty(),
1108 &mut planner_context,
1109 )?),
1110 None => None,
1111 };
1112
1113 let params = CreateFunctionBody {
1114 language,
1115 behavior: behavior.map(|b| match b {
1116 ast::FunctionBehavior::Immutable => Volatility::Immutable,
1117 ast::FunctionBehavior::Stable => Volatility::Stable,
1118 ast::FunctionBehavior::Volatile => Volatility::Volatile,
1119 }),
1120 function_body,
1121 };
1122
1123 let statement = DdlStatement::CreateFunction(CreateFunction {
1124 or_replace,
1125 temporary,
1126 name,
1127 return_type,
1128 args,
1129 params,
1130 schema: DFSchemaRef::new(DFSchema::empty()),
1131 });
1132
1133 Ok(LogicalPlan::Ddl(statement))
1134 }
1135 Statement::DropFunction {
1136 if_exists,
1137 func_desc,
1138 ..
1139 } => {
1140 if let Some(desc) = func_desc.first() {
1143 let name = match &desc.name.0[..] {
1145 [] => exec_err!("Function should have name")?,
1146 [n] => n.value.clone(),
1147 [..] => not_impl_err!("Qualified functions are not supported")?,
1148 };
1149 let statement = DdlStatement::DropFunction(DropFunction {
1150 if_exists,
1151 name,
1152 schema: DFSchemaRef::new(DFSchema::empty()),
1153 });
1154 Ok(LogicalPlan::Ddl(statement))
1155 } else {
1156 exec_err!("Function name not provided")
1157 }
1158 }
1159 Statement::CreateIndex(CreateIndex {
1160 name,
1161 table_name,
1162 using,
1163 columns,
1164 unique,
1165 if_not_exists,
1166 ..
1167 }) => {
1168 let name: Option<String> = name.as_ref().map(object_name_to_string);
1169 let table = self.object_name_to_table_reference(table_name)?;
1170 let table_schema = self
1171 .context_provider
1172 .get_table_source(table.clone())?
1173 .schema()
1174 .to_dfschema_ref()?;
1175 let using: Option<String> = using.as_ref().map(ident_to_string);
1176 let columns = self.order_by_to_sort_expr(
1177 columns,
1178 &table_schema,
1179 planner_context,
1180 false,
1181 None,
1182 )?;
1183 Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1184 PlanCreateIndex {
1185 name,
1186 table,
1187 using,
1188 columns,
1189 unique,
1190 if_not_exists,
1191 schema: DFSchemaRef::new(DFSchema::empty()),
1192 },
1193 )))
1194 }
1195 stmt => {
1196 not_impl_err!("Unsupported SQL statement: {stmt}")
1197 }
1198 }
1199 }
1200
1201 fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1202 let mut from = match from {
1203 FromTable::WithFromKeyword(v) => v,
1204 FromTable::WithoutKeyword(v) => v,
1205 };
1206
1207 if from.len() != 1 {
1208 return not_impl_err!(
1209 "DELETE FROM only supports single table, got {}: {from:?}",
1210 from.len()
1211 );
1212 }
1213 let table_factor = from.pop().unwrap();
1214 if !table_factor.joins.is_empty() {
1215 return not_impl_err!("DELETE FROM only supports single table, got: joins");
1216 }
1217 let TableFactor::Table { name, .. } = table_factor.relation else {
1218 return not_impl_err!(
1219 "DELETE FROM only supports single table, got: {table_factor:?}"
1220 );
1221 };
1222
1223 Ok(name)
1224 }
1225
1226 fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1228 if self.has_table("information_schema", "tables") {
1229 let query = "SELECT * FROM information_schema.tables;";
1230 let mut rewrite = DFParser::parse_sql(query)?;
1231 assert_eq!(rewrite.len(), 1);
1232 self.statement_to_plan(rewrite.pop_front().unwrap()) } else {
1234 plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1235 }
1236 }
1237
1238 fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1239 let table_ref = self.object_name_to_table_reference(table_name)?;
1240
1241 let table_source = self.context_provider.get_table_source(table_ref)?;
1242
1243 let schema = table_source.schema();
1244
1245 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1246
1247 Ok(LogicalPlan::DescribeTable(DescribeTable {
1248 schema,
1249 output_schema: Arc::new(output_schema),
1250 }))
1251 }
1252
1253 fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1254 let copy_source = statement.source;
1256 let (input, input_schema, table_ref) = match copy_source {
1257 CopyToSource::Relation(object_name) => {
1258 let table_name = object_name_to_string(&object_name);
1259 let table_ref = self.object_name_to_table_reference(object_name)?;
1260 let table_source =
1261 self.context_provider.get_table_source(table_ref.clone())?;
1262 let plan =
1263 LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1264 let input_schema = Arc::clone(plan.schema());
1265 (plan, input_schema, Some(table_ref))
1266 }
1267 CopyToSource::Query(query) => {
1268 let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1269 let input_schema = Arc::clone(plan.schema());
1270 (plan, input_schema, None)
1271 }
1272 };
1273
1274 let options_map = self.parse_options_map(statement.options, true)?;
1275
1276 let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1277 if let Ok(ext_file_type) = self.context_provider.get_file_type(stored_as) {
1278 Some(ext_file_type)
1279 } else {
1280 None
1281 }
1282 } else {
1283 None
1284 };
1285
1286 let file_type = match maybe_file_type {
1287 Some(ft) => ft,
1288 None => {
1289 let e = || {
1290 DataFusionError::Configuration(
1291 "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1292 .to_string(),
1293 )
1294 };
1295 let extension: &str = &Path::new(&statement.target)
1297 .extension()
1298 .ok_or_else(e)?
1299 .to_str()
1300 .ok_or_else(e)?
1301 .to_lowercase();
1302
1303 self.context_provider.get_file_type(extension)?
1304 }
1305 };
1306
1307 let partition_by = statement
1308 .partitioned_by
1309 .iter()
1310 .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1311 .collect::<Result<Vec<_>>>()?
1312 .into_iter()
1313 .map(|f| f.name().to_owned())
1314 .collect();
1315
1316 Ok(LogicalPlan::Copy(CopyTo {
1317 input: Arc::new(input),
1318 output_url: statement.target,
1319 file_type,
1320 partition_by,
1321 options: options_map,
1322 }))
1323 }
1324
1325 fn build_order_by(
1326 &self,
1327 order_exprs: Vec<LexOrdering>,
1328 schema: &DFSchemaRef,
1329 planner_context: &mut PlannerContext,
1330 ) -> Result<Vec<Vec<SortExpr>>> {
1331 if !order_exprs.is_empty() && schema.fields().is_empty() {
1332 let results = order_exprs
1333 .iter()
1334 .map(|lex_order| {
1335 let result = lex_order
1336 .iter()
1337 .map(|order_by_expr| {
1338 let ordered_expr = &order_by_expr.expr;
1339 let ordered_expr = ordered_expr.to_owned();
1340 let ordered_expr = self
1341 .sql_expr_to_logical_expr(
1342 ordered_expr,
1343 schema,
1344 planner_context,
1345 )
1346 .unwrap();
1347 let asc = order_by_expr.asc.unwrap_or(true);
1348 let nulls_first = order_by_expr.nulls_first.unwrap_or(!asc);
1349
1350 SortExpr::new(ordered_expr, asc, nulls_first)
1351 })
1352 .collect::<Vec<SortExpr>>();
1353 result
1354 })
1355 .collect::<Vec<Vec<SortExpr>>>();
1356
1357 return Ok(results);
1358 }
1359
1360 let mut all_results = vec![];
1361 for expr in order_exprs {
1362 let expr_vec =
1364 self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1365 for sort in expr_vec.iter() {
1367 for column in sort.expr.column_refs().iter() {
1368 if !schema.has_column(column) {
1369 return plan_err!("Column {column} is not in schema");
1371 }
1372 }
1373 }
1374 all_results.push(expr_vec)
1376 }
1377 Ok(all_results)
1378 }
1379
1380 fn external_table_to_plan(
1382 &self,
1383 statement: CreateExternalTable,
1384 ) -> Result<LogicalPlan> {
1385 let definition = Some(statement.to_string());
1386 let CreateExternalTable {
1387 name,
1388 columns,
1389 file_type,
1390 location,
1391 table_partition_cols,
1392 if_not_exists,
1393 temporary,
1394 order_exprs,
1395 unbounded,
1396 options,
1397 constraints,
1398 } = statement;
1399
1400 let mut all_constraints = constraints;
1402 let inline_constraints = calc_inline_constraints_from_columns(&columns);
1403 all_constraints.extend(inline_constraints);
1404
1405 let options_map = self.parse_options_map(options, false)?;
1406
1407 let compression = options_map
1408 .get("format.compression")
1409 .map(|c| CompressionTypeVariant::from_str(c))
1410 .transpose()?;
1411 if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1412 && compression
1413 .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1414 .unwrap_or(false)
1415 {
1416 plan_err!(
1417 "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1418 )?;
1419 }
1420
1421 let mut planner_context = PlannerContext::new();
1422
1423 let column_defaults = self
1424 .build_column_defaults(&columns, &mut planner_context)?
1425 .into_iter()
1426 .collect();
1427
1428 let schema = self.build_schema(columns)?;
1429 let df_schema = schema.to_dfschema_ref()?;
1430 df_schema.check_names()?;
1431
1432 let ordered_exprs =
1433 self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1434
1435 let name = self.object_name_to_table_reference(name)?;
1436 let constraints =
1437 self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1438 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1439 PlanCreateExternalTable {
1440 schema: df_schema,
1441 name,
1442 location,
1443 file_type,
1444 table_partition_cols,
1445 if_not_exists,
1446 temporary,
1447 definition,
1448 order_exprs: ordered_exprs,
1449 unbounded,
1450 options: options_map,
1451 constraints,
1452 column_defaults,
1453 },
1454 )))
1455 }
1456
1457 fn get_constraint_column_indices(
1460 &self,
1461 df_schema: &DFSchemaRef,
1462 columns: &[Ident],
1463 constraint_name: &str,
1464 ) -> Result<Vec<usize>> {
1465 let field_names = df_schema.field_names();
1466 columns
1467 .iter()
1468 .map(|ident| {
1469 let column = self.ident_normalizer.normalize(ident.clone());
1470 field_names
1471 .iter()
1472 .position(|item| *item == column)
1473 .ok_or_else(|| {
1474 plan_datafusion_err!(
1475 "Column for {constraint_name} not found in schema: {column}"
1476 )
1477 })
1478 })
1479 .collect::<Result<Vec<_>>>()
1480 }
1481
1482 fn new_constraint_from_table_constraints(
1484 &self,
1485 constraints: &[TableConstraint],
1486 df_schema: &DFSchemaRef,
1487 ) -> Result<Constraints> {
1488 let constraints = constraints
1489 .iter()
1490 .map(|c: &TableConstraint| match c {
1491 TableConstraint::Unique { name, columns, .. } => {
1492 let constraint_name = match name {
1493 Some(name) => &format!("unique constraint with name '{name}'"),
1494 None => "unique constraint",
1495 };
1496 let indices = self.get_constraint_column_indices(
1498 df_schema,
1499 columns,
1500 constraint_name,
1501 )?;
1502 Ok(Constraint::Unique(indices))
1503 }
1504 TableConstraint::PrimaryKey { columns, .. } => {
1505 let indices = self.get_constraint_column_indices(
1507 df_schema,
1508 columns,
1509 "primary key",
1510 )?;
1511 Ok(Constraint::PrimaryKey(indices))
1512 }
1513 TableConstraint::ForeignKey { .. } => {
1514 _plan_err!("Foreign key constraints are not currently supported")
1515 }
1516 TableConstraint::Check { .. } => {
1517 _plan_err!("Check constraints are not currently supported")
1518 }
1519 TableConstraint::Index { .. } => {
1520 _plan_err!("Indexes are not currently supported")
1521 }
1522 TableConstraint::FulltextOrSpatial { .. } => {
1523 _plan_err!("Indexes are not currently supported")
1524 }
1525 })
1526 .collect::<Result<Vec<_>>>()?;
1527 Ok(Constraints::new_unverified(constraints))
1528 }
1529
1530 fn parse_options_map(
1531 &self,
1532 options: Vec<(String, Value)>,
1533 allow_duplicates: bool,
1534 ) -> Result<HashMap<String, String>> {
1535 let mut options_map = HashMap::new();
1536 for (key, value) in options {
1537 if !allow_duplicates && options_map.contains_key(&key) {
1538 return plan_err!("Option {key} is specified multiple times");
1539 }
1540
1541 let Some(value_string) = crate::utils::value_to_string(&value) else {
1542 return plan_err!("Unsupported Value {}", value);
1543 };
1544
1545 if !(&key.contains('.')) {
1546 let renamed_key = format!("format.{}", key);
1550 options_map.insert(renamed_key.to_lowercase(), value_string);
1551 } else {
1552 options_map.insert(key.to_lowercase(), value_string);
1553 }
1554 }
1555
1556 Ok(options_map)
1557 }
1558
1559 fn explain_to_plan(
1564 &self,
1565 verbose: bool,
1566 analyze: bool,
1567 statement: DFStatement,
1568 ) -> Result<LogicalPlan> {
1569 let plan = self.statement_to_plan(statement)?;
1570 if matches!(plan, LogicalPlan::Explain(_)) {
1571 return plan_err!("Nested EXPLAINs are not supported");
1572 }
1573 let plan = Arc::new(plan);
1574 let schema = LogicalPlan::explain_schema();
1575 let schema = schema.to_dfschema_ref()?;
1576
1577 if analyze {
1578 Ok(LogicalPlan::Analyze(Analyze {
1579 verbose,
1580 input: plan,
1581 schema,
1582 }))
1583 } else {
1584 let stringified_plans =
1585 vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1586 Ok(LogicalPlan::Explain(Explain {
1587 verbose,
1588 plan,
1589 stringified_plans,
1590 schema,
1591 logical_optimization_succeeded: false,
1592 }))
1593 }
1594 }
1595
1596 fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1597 if !self.has_table("information_schema", "df_settings") {
1598 return plan_err!(
1599 "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1600 );
1601 }
1602
1603 let verbose = variable
1604 .last()
1605 .map(|s| ident_to_string(s) == "verbose")
1606 .unwrap_or(false);
1607 let mut variable_vec = variable.to_vec();
1608 let mut columns: String = "name, value".to_owned();
1609
1610 if verbose {
1611 columns = format!("{columns}, description");
1612 variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1613 }
1614
1615 let variable = object_name_to_string(&ObjectName(variable_vec));
1616 let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1617 let query = if variable == "all" {
1618 format!("{base_query} ORDER BY name")
1620 } else if variable == "timezone" || variable == "time.zone" {
1621 format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1623 } else {
1624 let is_valid_variable = self
1628 .context_provider
1629 .options()
1630 .entries()
1631 .iter()
1632 .any(|opt| opt.key == variable);
1633
1634 if !is_valid_variable {
1635 return plan_err!(
1636 "'{variable}' is not a variable which can be viewed with 'SHOW'"
1637 );
1638 }
1639
1640 format!("{base_query} WHERE name = '{variable}'")
1641 };
1642
1643 let mut rewrite = DFParser::parse_sql(&query)?;
1644 assert_eq!(rewrite.len(), 1);
1645
1646 self.statement_to_plan(rewrite.pop_front().unwrap())
1647 }
1648
1649 fn set_variable_to_plan(
1650 &self,
1651 local: bool,
1652 hivevar: bool,
1653 variables: &OneOrManyWithParens<ObjectName>,
1654 value: Vec<SQLExpr>,
1655 ) -> Result<LogicalPlan> {
1656 if local {
1657 return not_impl_err!("LOCAL is not supported");
1658 }
1659
1660 if hivevar {
1661 return not_impl_err!("HIVEVAR is not supported");
1662 }
1663
1664 let variable = match variables {
1665 OneOrManyWithParens::One(v) => object_name_to_string(v),
1666 OneOrManyWithParens::Many(vs) => {
1667 return not_impl_err!(
1668 "SET only supports single variable assignment: {vs:?}"
1669 );
1670 }
1671 };
1672 let mut variable_lower = variable.to_lowercase();
1673
1674 if variable_lower == "timezone" || variable_lower == "time.zone" {
1675 variable_lower = "datafusion.execution.time_zone".to_string();
1677 }
1678
1679 let value_string = match &value[0] {
1681 SQLExpr::Identifier(i) => ident_to_string(i),
1682 SQLExpr::Value(v) => match crate::utils::value_to_string(v) {
1683 None => {
1684 return plan_err!("Unsupported Value {}", value[0]);
1685 }
1686 Some(v) => v,
1687 },
1688 SQLExpr::UnaryOp { op, expr } => match op {
1690 UnaryOperator::Plus => format!("+{expr}"),
1691 UnaryOperator::Minus => format!("-{expr}"),
1692 _ => {
1693 return plan_err!("Unsupported Value {}", value[0]);
1694 }
1695 },
1696 _ => {
1697 return plan_err!("Unsupported Value {}", value[0]);
1698 }
1699 };
1700
1701 let statement = PlanStatement::SetVariable(SetVariable {
1702 variable: variable_lower,
1703 value: value_string,
1704 });
1705
1706 Ok(LogicalPlan::Statement(statement))
1707 }
1708
1709 fn delete_to_plan(
1710 &self,
1711 table_name: ObjectName,
1712 predicate_expr: Option<SQLExpr>,
1713 ) -> Result<LogicalPlan> {
1714 let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1716 let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1717 let schema = table_source.schema().to_dfschema_ref()?;
1718 let scan =
1719 LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1720 .build()?;
1721 let mut planner_context = PlannerContext::new();
1722
1723 let source = match predicate_expr {
1724 None => scan,
1725 Some(predicate_expr) => {
1726 let filter_expr =
1727 self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1728 let schema = Arc::new(schema);
1729 let mut using_columns = HashSet::new();
1730 expr_to_columns(&filter_expr, &mut using_columns)?;
1731 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1732 filter_expr,
1733 &[&[&schema]],
1734 &[using_columns],
1735 )?;
1736 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1737 }
1738 };
1739
1740 let plan = LogicalPlan::Dml(DmlStatement::new(
1741 table_ref,
1742 table_source,
1743 WriteOp::Delete,
1744 Arc::new(source),
1745 ));
1746 Ok(plan)
1747 }
1748
1749 fn update_to_plan(
1750 &self,
1751 table: TableWithJoins,
1752 assignments: Vec<Assignment>,
1753 from: Option<TableWithJoins>,
1754 predicate_expr: Option<SQLExpr>,
1755 ) -> Result<LogicalPlan> {
1756 let (table_name, table_alias) = match &table.relation {
1757 TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
1758 _ => plan_err!("Cannot update non-table relation!")?,
1759 };
1760
1761 let table_name = self.object_name_to_table_reference(table_name)?;
1763 let table_source = self.context_provider.get_table_source(table_name.clone())?;
1764 let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
1765 table_name.clone(),
1766 &table_source.schema(),
1767 )?);
1768
1769 let mut planner_context = PlannerContext::new();
1771 let mut assign_map = assignments
1772 .iter()
1773 .map(|assign| {
1774 let cols = match &assign.target {
1775 AssignmentTarget::ColumnName(cols) => cols,
1776 _ => plan_err!("Tuples are not supported")?,
1777 };
1778 let col_name: &Ident = cols
1779 .0
1780 .iter()
1781 .last()
1782 .ok_or_else(|| plan_datafusion_err!("Empty column id"))?;
1783 table_schema.field_with_unqualified_name(&col_name.value)?;
1785 Ok((col_name.value.clone(), assign.value.clone()))
1786 })
1787 .collect::<Result<HashMap<String, SQLExpr>>>()?;
1788
1789 let mut input_tables = vec![table];
1791 input_tables.extend(from);
1792 let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
1793
1794 let source = match predicate_expr {
1796 None => scan,
1797 Some(predicate_expr) => {
1798 let filter_expr = self.sql_to_expr(
1799 predicate_expr,
1800 scan.schema(),
1801 &mut planner_context,
1802 )?;
1803 let mut using_columns = HashSet::new();
1804 expr_to_columns(&filter_expr, &mut using_columns)?;
1805 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1806 filter_expr,
1807 &[&[scan.schema()]],
1808 &[using_columns],
1809 )?;
1810 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1811 }
1812 };
1813
1814 let exprs = table_schema
1816 .iter()
1817 .map(|(qualifier, field)| {
1818 let expr = match assign_map.remove(field.name()) {
1819 Some(new_value) => {
1820 let mut expr = self.sql_to_expr(
1821 new_value,
1822 source.schema(),
1823 &mut planner_context,
1824 )?;
1825 if let Expr::Placeholder(placeholder) = &mut expr {
1827 placeholder.data_type = placeholder
1828 .data_type
1829 .take()
1830 .or_else(|| Some(field.data_type().clone()));
1831 }
1832 expr.cast_to(field.data_type(), source.schema())?
1834 }
1835 None => {
1836 if let Some(alias) = &table_alias {
1838 Expr::Column(Column::new(
1839 Some(self.ident_normalizer.normalize(alias.name.clone())),
1840 field.name(),
1841 ))
1842 } else {
1843 Expr::Column(Column::from((qualifier, field)))
1844 }
1845 }
1846 };
1847 Ok(expr.alias(field.name()))
1848 })
1849 .collect::<Result<Vec<_>>>()?;
1850
1851 let source = project(source, exprs)?;
1852
1853 let plan = LogicalPlan::Dml(DmlStatement::new(
1854 table_name,
1855 table_source,
1856 WriteOp::Update,
1857 Arc::new(source),
1858 ));
1859 Ok(plan)
1860 }
1861
1862 fn insert_to_plan(
1863 &self,
1864 table_name: ObjectName,
1865 columns: Vec<Ident>,
1866 source: Box<Query>,
1867 overwrite: bool,
1868 replace_into: bool,
1869 ) -> Result<LogicalPlan> {
1870 let table_name = self.object_name_to_table_reference(table_name)?;
1872 let table_source = self.context_provider.get_table_source(table_name.clone())?;
1873 let arrow_schema = (*table_source.schema()).clone();
1874 let table_schema = DFSchema::try_from(arrow_schema)?;
1875
1876 let (fields, value_indices) = if columns.is_empty() {
1884 (
1886 table_schema.fields().clone(),
1887 (0..table_schema.fields().len())
1888 .map(Some)
1889 .collect::<Vec<_>>(),
1890 )
1891 } else {
1892 let mut value_indices = vec![None; table_schema.fields().len()];
1893 let fields = columns
1894 .into_iter()
1895 .map(|c| self.ident_normalizer.normalize(c))
1896 .enumerate()
1897 .map(|(i, c)| {
1898 let column_index = table_schema
1899 .index_of_column_by_name(None, &c)
1900 .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
1901
1902 if value_indices[column_index].is_some() {
1903 return schema_err!(SchemaError::DuplicateUnqualifiedField {
1904 name: c,
1905 });
1906 } else {
1907 value_indices[column_index] = Some(i);
1908 }
1909 Ok(table_schema.field(column_index).clone())
1910 })
1911 .collect::<Result<Vec<_>>>()?;
1912 (Fields::from(fields), value_indices)
1913 };
1914
1915 let mut prepare_param_data_types = BTreeMap::new();
1917 if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
1918 for row in rows.iter() {
1919 for (idx, val) in row.iter().enumerate() {
1920 if let SQLExpr::Value(Value::Placeholder(name)) = val {
1921 let name =
1922 name.replace('$', "").parse::<usize>().map_err(|_| {
1923 plan_datafusion_err!("Can't parse placeholder: {name}")
1924 })? - 1;
1925 let field = fields.get(idx).ok_or_else(|| {
1926 plan_datafusion_err!(
1927 "Placeholder ${} refers to a non existent column",
1928 idx + 1
1929 )
1930 })?;
1931 let dt = field.data_type().clone();
1932 let _ = prepare_param_data_types.insert(name, dt);
1933 }
1934 }
1935 }
1936 }
1937 let prepare_param_data_types = prepare_param_data_types.into_values().collect();
1938
1939 let mut planner_context =
1941 PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
1942 planner_context.set_table_schema(Some(DFSchemaRef::new(
1943 DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
1944 )));
1945 let source = self.query_to_plan(*source, &mut planner_context)?;
1946 if fields.len() != source.schema().fields().len() {
1947 plan_err!("Column count doesn't match insert query!")?;
1948 }
1949
1950 let exprs = value_indices
1951 .into_iter()
1952 .enumerate()
1953 .map(|(i, value_index)| {
1954 let target_field = table_schema.field(i);
1955 let expr = match value_index {
1956 Some(v) => {
1957 Expr::Column(Column::from(source.schema().qualified_field(v)))
1958 .cast_to(target_field.data_type(), source.schema())?
1959 }
1960 None => table_source
1962 .get_column_default(target_field.name())
1963 .cloned()
1964 .unwrap_or_else(|| {
1965 Expr::Literal(ScalarValue::Null)
1967 })
1968 .cast_to(target_field.data_type(), &DFSchema::empty())?,
1969 };
1970 Ok(expr.alias(target_field.name()))
1971 })
1972 .collect::<Result<Vec<Expr>>>()?;
1973 let source = project(source, exprs)?;
1974
1975 let insert_op = match (overwrite, replace_into) {
1976 (false, false) => InsertOp::Append,
1977 (true, false) => InsertOp::Overwrite,
1978 (false, true) => InsertOp::Replace,
1979 (true, true) => plan_err!("Conflicting insert operations: `overwrite` and `replace_into` cannot both be true")?,
1980 };
1981
1982 let plan = LogicalPlan::Dml(DmlStatement::new(
1983 table_name,
1984 Arc::clone(&table_source),
1985 WriteOp::Insert(insert_op),
1986 Arc::new(source),
1987 ));
1988 Ok(plan)
1989 }
1990
1991 fn show_columns_to_plan(
1992 &self,
1993 extended: bool,
1994 full: bool,
1995 sql_table_name: ObjectName,
1996 ) -> Result<LogicalPlan> {
1997 let where_clause = object_name_to_qualifier(
1999 &sql_table_name,
2000 self.options.enable_ident_normalization,
2001 );
2002
2003 if !self.has_table("information_schema", "columns") {
2004 return plan_err!(
2005 "SHOW COLUMNS is not supported unless information_schema is enabled"
2006 );
2007 }
2008
2009 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2011 let _ = self.context_provider.get_table_source(table_ref)?;
2012
2013 let select_list = if full || extended {
2015 "*"
2016 } else {
2017 "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2018 };
2019
2020 let query = format!(
2021 "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2022 );
2023
2024 let mut rewrite = DFParser::parse_sql(&query)?;
2025 assert_eq!(rewrite.len(), 1);
2026 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2028
2029 fn show_functions_to_plan(
2040 &self,
2041 filter: Option<ShowStatementFilter>,
2042 ) -> Result<LogicalPlan> {
2043 let where_clause = if let Some(filter) = filter {
2044 match filter {
2045 ShowStatementFilter::Like(like) => {
2046 format!("WHERE p.function_name like '{like}'")
2047 }
2048 _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2049 }
2050 } else {
2051 "".to_string()
2052 };
2053
2054 let query = format!(
2055 r#"
2056SELECT DISTINCT
2057 p.*,
2058 r.function_type function_type,
2059 r.description description,
2060 r.syntax_example syntax_example
2061FROM
2062 (
2063 SELECT
2064 i.specific_name function_name,
2065 o.data_type return_type,
2066 array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2067 array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2068 FROM (
2069 SELECT
2070 specific_catalog,
2071 specific_schema,
2072 specific_name,
2073 ordinal_position,
2074 parameter_name,
2075 data_type,
2076 rid
2077 FROM
2078 information_schema.parameters
2079 WHERE
2080 parameter_mode = 'IN'
2081 ) i
2082 JOIN
2083 (
2084 SELECT
2085 specific_catalog,
2086 specific_schema,
2087 specific_name,
2088 ordinal_position,
2089 parameter_name,
2090 data_type,
2091 rid
2092 FROM
2093 information_schema.parameters
2094 WHERE
2095 parameter_mode = 'OUT'
2096 ) o
2097 ON i.specific_catalog = o.specific_catalog
2098 AND i.specific_schema = o.specific_schema
2099 AND i.specific_name = o.specific_name
2100 AND i.rid = o.rid
2101 GROUP BY 1, 2, i.rid
2102 ) as p
2103JOIN information_schema.routines r
2104ON p.function_name = r.routine_name
2105{where_clause}
2106 "#
2107 );
2108 let mut rewrite = DFParser::parse_sql(&query)?;
2109 assert_eq!(rewrite.len(), 1);
2110 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2112
2113 fn show_create_table_to_plan(
2114 &self,
2115 sql_table_name: ObjectName,
2116 ) -> Result<LogicalPlan> {
2117 if !self.has_table("information_schema", "tables") {
2118 return plan_err!(
2119 "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2120 );
2121 }
2122 let where_clause = object_name_to_qualifier(
2124 &sql_table_name,
2125 self.options.enable_ident_normalization,
2126 );
2127
2128 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2130 let _ = self.context_provider.get_table_source(table_ref)?;
2131
2132 let query = format!(
2133 "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2134 );
2135
2136 let mut rewrite = DFParser::parse_sql(&query)?;
2137 assert_eq!(rewrite.len(), 1);
2138 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2140
2141 fn has_table(&self, schema: &str, table: &str) -> bool {
2143 let tables_reference = TableReference::Partial {
2144 schema: schema.into(),
2145 table: table.into(),
2146 };
2147 self.context_provider
2148 .get_table_source(tables_reference)
2149 .is_ok()
2150 }
2151
2152 fn validate_transaction_kind(
2153 &self,
2154 kind: Option<BeginTransactionKind>,
2155 ) -> Result<()> {
2156 match kind {
2157 None => Ok(()),
2159 Some(BeginTransactionKind::Transaction) => Ok(()),
2161 Some(BeginTransactionKind::Work) => {
2162 not_impl_err!("Transaction kind not supported: {kind:?}")
2163 }
2164 }
2165 }
2166}