1use crate::{Expr, LogicalPlan, SortExpr, Volatility};
19use std::cmp::Ordering;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::{
23 fmt::{self, Display},
24 hash::{Hash, Hasher},
25};
26
27#[cfg(not(feature = "sql"))]
28use crate::expr::Ident;
29use crate::expr::Sort;
30use arrow::datatypes::DataType;
31use datafusion_common::tree_node::{Transformed, TreeNodeContainer, TreeNodeRecursion};
32use datafusion_common::{
33 Constraints, DFSchemaRef, Result, SchemaReference, TableReference,
34};
35#[cfg(feature = "sql")]
36use sqlparser::ast::Ident;
37
38#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
40pub enum DdlStatement {
41 CreateExternalTable(CreateExternalTable),
43 CreateMemoryTable(CreateMemoryTable),
45 CreateView(CreateView),
47 CreateCatalogSchema(CreateCatalogSchema),
49 CreateCatalog(CreateCatalog),
51 CreateIndex(CreateIndex),
53 DropTable(DropTable),
55 DropView(DropView),
57 DropCatalogSchema(DropCatalogSchema),
59 CreateFunction(CreateFunction),
61 DropFunction(DropFunction),
63}
64
65impl DdlStatement {
66 pub fn schema(&self) -> &DFSchemaRef {
68 match self {
69 DdlStatement::CreateExternalTable(CreateExternalTable { schema, .. }) => {
70 schema
71 }
72 DdlStatement::CreateMemoryTable(CreateMemoryTable { input, .. })
73 | DdlStatement::CreateView(CreateView { input, .. }) => input.schema(),
74 DdlStatement::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
75 schema
76 }
77 DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema,
78 DdlStatement::CreateIndex(CreateIndex { schema, .. }) => schema,
79 DdlStatement::DropTable(DropTable { schema, .. }) => schema,
80 DdlStatement::DropView(DropView { schema, .. }) => schema,
81 DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
82 DdlStatement::CreateFunction(CreateFunction { schema, .. }) => schema,
83 DdlStatement::DropFunction(DropFunction { schema, .. }) => schema,
84 }
85 }
86
87 pub fn name(&self) -> &str {
90 match self {
91 DdlStatement::CreateExternalTable(_) => "CreateExternalTable",
92 DdlStatement::CreateMemoryTable(_) => "CreateMemoryTable",
93 DdlStatement::CreateView(_) => "CreateView",
94 DdlStatement::CreateCatalogSchema(_) => "CreateCatalogSchema",
95 DdlStatement::CreateCatalog(_) => "CreateCatalog",
96 DdlStatement::CreateIndex(_) => "CreateIndex",
97 DdlStatement::DropTable(_) => "DropTable",
98 DdlStatement::DropView(_) => "DropView",
99 DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema",
100 DdlStatement::CreateFunction(_) => "CreateFunction",
101 DdlStatement::DropFunction(_) => "DropFunction",
102 }
103 }
104
105 pub fn inputs(&self) -> Vec<&LogicalPlan> {
107 match self {
108 DdlStatement::CreateExternalTable(_) => vec![],
109 DdlStatement::CreateCatalogSchema(_) => vec![],
110 DdlStatement::CreateCatalog(_) => vec![],
111 DdlStatement::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
112 vec![input]
113 }
114 DdlStatement::CreateView(CreateView { input, .. }) => vec![input],
115 DdlStatement::CreateIndex(_) => vec![],
116 DdlStatement::DropTable(_) => vec![],
117 DdlStatement::DropView(_) => vec![],
118 DdlStatement::DropCatalogSchema(_) => vec![],
119 DdlStatement::CreateFunction(_) => vec![],
120 DdlStatement::DropFunction(_) => vec![],
121 }
122 }
123
124 pub fn display(&self) -> impl Display + '_ {
130 struct Wrapper<'a>(&'a DdlStatement);
131 impl Display for Wrapper<'_> {
132 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133 match self.0 {
134 DdlStatement::CreateExternalTable(CreateExternalTable {
135 name,
136 constraints,
137 ..
138 }) => {
139 if constraints.is_empty() {
140 write!(f, "CreateExternalTable: {name:?}")
141 } else {
142 write!(f, "CreateExternalTable: {name:?} {constraints}")
143 }
144 }
145 DdlStatement::CreateMemoryTable(CreateMemoryTable {
146 name,
147 constraints,
148 ..
149 }) => {
150 if constraints.is_empty() {
151 write!(f, "CreateMemoryTable: {name:?}")
152 } else {
153 write!(f, "CreateMemoryTable: {name:?} {constraints}")
154 }
155 }
156 DdlStatement::CreateView(CreateView { name, .. }) => {
157 write!(f, "CreateView: {name:?}")
158 }
159 DdlStatement::CreateCatalogSchema(CreateCatalogSchema {
160 schema_name,
161 ..
162 }) => {
163 write!(f, "CreateCatalogSchema: {schema_name:?}")
164 }
165 DdlStatement::CreateCatalog(CreateCatalog {
166 catalog_name, ..
167 }) => {
168 write!(f, "CreateCatalog: {catalog_name:?}")
169 }
170 DdlStatement::CreateIndex(CreateIndex { name, .. }) => {
171 write!(f, "CreateIndex: {name:?}")
172 }
173 DdlStatement::DropTable(DropTable {
174 name, if_exists, ..
175 }) => {
176 write!(f, "DropTable: {name:?} if not exist:={if_exists}")
177 }
178 DdlStatement::DropView(DropView {
179 name, if_exists, ..
180 }) => {
181 write!(f, "DropView: {name:?} if not exist:={if_exists}")
182 }
183 DdlStatement::DropCatalogSchema(DropCatalogSchema {
184 name,
185 if_exists,
186 cascade,
187 ..
188 }) => {
189 write!(
190 f,
191 "DropCatalogSchema: {name:?} if not exist:={if_exists} cascade:={cascade}"
192 )
193 }
194 DdlStatement::CreateFunction(CreateFunction { name, .. }) => {
195 write!(f, "CreateFunction: name {name:?}")
196 }
197 DdlStatement::DropFunction(DropFunction { name, .. }) => {
198 write!(f, "DropFunction: name {name:?}")
199 }
200 }
201 }
202 }
203 Wrapper(self)
204 }
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
209pub struct CreateExternalTable {
210 pub schema: DFSchemaRef,
212 pub name: TableReference,
214 pub location: String,
216 pub file_type: String,
218 pub table_partition_cols: Vec<String>,
220 pub if_not_exists: bool,
222 pub or_replace: bool,
224 pub temporary: bool,
226 pub definition: Option<String>,
228 pub order_exprs: Vec<Vec<Sort>>,
230 pub unbounded: bool,
232 pub options: HashMap<String, String>,
234 pub constraints: Constraints,
236 pub column_defaults: HashMap<String, Expr>,
238}
239
240impl CreateExternalTable {
241 pub fn builder(
262 name: impl Into<TableReference>,
263 location: impl Into<String>,
264 file_type: impl Into<String>,
265 schema: DFSchemaRef,
266 ) -> CreateExternalTableBuilder {
267 CreateExternalTableBuilder {
268 name: name.into(),
269 location: location.into(),
270 file_type: file_type.into(),
271 schema,
272 table_partition_cols: vec![],
273 if_not_exists: false,
274 or_replace: false,
275 temporary: false,
276 definition: None,
277 order_exprs: vec![],
278 unbounded: false,
279 options: HashMap::new(),
280 constraints: Default::default(),
281 column_defaults: HashMap::new(),
282 }
283 }
284}
285
286#[derive(Debug, Clone)]
290pub struct CreateExternalTableBuilder {
291 name: TableReference,
292 location: String,
293 file_type: String,
294 schema: DFSchemaRef,
295 table_partition_cols: Vec<String>,
296 if_not_exists: bool,
297 or_replace: bool,
298 temporary: bool,
299 definition: Option<String>,
300 order_exprs: Vec<Vec<Sort>>,
301 unbounded: bool,
302 options: HashMap<String, String>,
303 constraints: Constraints,
304 column_defaults: HashMap<String, Expr>,
305}
306
307impl CreateExternalTableBuilder {
308 pub fn with_partition_cols(mut self, cols: Vec<String>) -> Self {
310 self.table_partition_cols = cols;
311 self
312 }
313
314 pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self {
316 self.if_not_exists = if_not_exists;
317 self
318 }
319
320 pub fn with_or_replace(mut self, or_replace: bool) -> Self {
322 self.or_replace = or_replace;
323 self
324 }
325
326 pub fn with_temporary(mut self, temporary: bool) -> Self {
328 self.temporary = temporary;
329 self
330 }
331
332 pub fn with_definition(mut self, definition: Option<String>) -> Self {
334 self.definition = definition;
335 self
336 }
337
338 pub fn with_order_exprs(mut self, order_exprs: Vec<Vec<Sort>>) -> Self {
340 self.order_exprs = order_exprs;
341 self
342 }
343
344 pub fn with_unbounded(mut self, unbounded: bool) -> Self {
346 self.unbounded = unbounded;
347 self
348 }
349
350 pub fn with_options(mut self, options: HashMap<String, String>) -> Self {
352 self.options = options;
353 self
354 }
355
356 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
358 self.constraints = constraints;
359 self
360 }
361
362 pub fn with_column_defaults(
364 mut self,
365 column_defaults: HashMap<String, Expr>,
366 ) -> Self {
367 self.column_defaults = column_defaults;
368 self
369 }
370
371 pub fn build(self) -> CreateExternalTable {
373 CreateExternalTable {
374 schema: self.schema,
375 name: self.name,
376 location: self.location,
377 file_type: self.file_type,
378 table_partition_cols: self.table_partition_cols,
379 if_not_exists: self.if_not_exists,
380 or_replace: self.or_replace,
381 temporary: self.temporary,
382 definition: self.definition,
383 order_exprs: self.order_exprs,
384 unbounded: self.unbounded,
385 options: self.options,
386 constraints: self.constraints,
387 column_defaults: self.column_defaults,
388 }
389 }
390}
391
392impl Hash for CreateExternalTable {
394 fn hash<H: Hasher>(&self, state: &mut H) {
395 self.schema.hash(state);
396 self.name.hash(state);
397 self.location.hash(state);
398 self.file_type.hash(state);
399 self.table_partition_cols.hash(state);
400 self.if_not_exists.hash(state);
401 self.definition.hash(state);
402 self.order_exprs.hash(state);
403 self.unbounded.hash(state);
404 self.options.len().hash(state); }
406}
407
408impl PartialOrd for CreateExternalTable {
411 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
412 #[derive(PartialEq, PartialOrd)]
413 struct ComparableCreateExternalTable<'a> {
414 pub name: &'a TableReference,
416 pub location: &'a String,
418 pub file_type: &'a String,
420 pub table_partition_cols: &'a Vec<String>,
422 pub if_not_exists: &'a bool,
424 pub definition: &'a Option<String>,
426 pub order_exprs: &'a Vec<Vec<Sort>>,
428 pub unbounded: &'a bool,
430 pub constraints: &'a Constraints,
432 }
433 let comparable_self = ComparableCreateExternalTable {
434 name: &self.name,
435 location: &self.location,
436 file_type: &self.file_type,
437 table_partition_cols: &self.table_partition_cols,
438 if_not_exists: &self.if_not_exists,
439 definition: &self.definition,
440 order_exprs: &self.order_exprs,
441 unbounded: &self.unbounded,
442 constraints: &self.constraints,
443 };
444 let comparable_other = ComparableCreateExternalTable {
445 name: &other.name,
446 location: &other.location,
447 file_type: &other.file_type,
448 table_partition_cols: &other.table_partition_cols,
449 if_not_exists: &other.if_not_exists,
450 definition: &other.definition,
451 order_exprs: &other.order_exprs,
452 unbounded: &other.unbounded,
453 constraints: &other.constraints,
454 };
455 comparable_self
456 .partial_cmp(&comparable_other)
457 .filter(|cmp| *cmp != Ordering::Equal || self == other)
459 }
460}
461
462#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
464pub struct CreateMemoryTable {
465 pub name: TableReference,
467 pub constraints: Constraints,
469 pub input: Arc<LogicalPlan>,
471 pub if_not_exists: bool,
473 pub or_replace: bool,
475 pub column_defaults: Vec<(String, Expr)>,
477 pub temporary: bool,
479}
480
481#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)]
483pub struct CreateView {
484 pub name: TableReference,
486 pub input: Arc<LogicalPlan>,
488 pub or_replace: bool,
490 pub definition: Option<String>,
492 pub temporary: bool,
494}
495
496#[derive(Debug, Clone, PartialEq, Eq, Hash)]
498pub struct CreateCatalog {
499 pub catalog_name: String,
501 pub if_not_exists: bool,
503 pub schema: DFSchemaRef,
505}
506
507impl PartialOrd for CreateCatalog {
509 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
510 match self.catalog_name.partial_cmp(&other.catalog_name) {
511 Some(Ordering::Equal) => self.if_not_exists.partial_cmp(&other.if_not_exists),
512 cmp => cmp,
513 }
514 .filter(|cmp| *cmp != Ordering::Equal || self == other)
516 }
517}
518
519#[derive(Debug, Clone, PartialEq, Eq, Hash)]
521pub struct CreateCatalogSchema {
522 pub schema_name: String,
524 pub if_not_exists: bool,
526 pub schema: DFSchemaRef,
528}
529
530impl PartialOrd for CreateCatalogSchema {
532 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
533 match self.schema_name.partial_cmp(&other.schema_name) {
534 Some(Ordering::Equal) => self.if_not_exists.partial_cmp(&other.if_not_exists),
535 cmp => cmp,
536 }
537 .filter(|cmp| *cmp != Ordering::Equal || self == other)
539 }
540}
541
542#[derive(Debug, Clone, PartialEq, Eq, Hash)]
544pub struct DropTable {
545 pub name: TableReference,
547 pub if_exists: bool,
549 pub schema: DFSchemaRef,
551}
552
553impl PartialOrd for DropTable {
555 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
556 match self.name.partial_cmp(&other.name) {
557 Some(Ordering::Equal) => self.if_exists.partial_cmp(&other.if_exists),
558 cmp => cmp,
559 }
560 .filter(|cmp| *cmp != Ordering::Equal || self == other)
562 }
563}
564
565#[derive(Debug, Clone, PartialEq, Eq, Hash)]
567pub struct DropView {
568 pub name: TableReference,
570 pub if_exists: bool,
572 pub schema: DFSchemaRef,
574}
575
576impl PartialOrd for DropView {
578 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
579 match self.name.partial_cmp(&other.name) {
580 Some(Ordering::Equal) => self.if_exists.partial_cmp(&other.if_exists),
581 cmp => cmp,
582 }
583 .filter(|cmp| *cmp != Ordering::Equal || self == other)
585 }
586}
587
588#[derive(Debug, Clone, PartialEq, Eq, Hash)]
590pub struct DropCatalogSchema {
591 pub name: SchemaReference,
593 pub if_exists: bool,
595 pub cascade: bool,
597 pub schema: DFSchemaRef,
599}
600
601impl PartialOrd for DropCatalogSchema {
603 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
604 match self.name.partial_cmp(&other.name) {
605 Some(Ordering::Equal) => match self.if_exists.partial_cmp(&other.if_exists) {
606 Some(Ordering::Equal) => self.cascade.partial_cmp(&other.cascade),
607 cmp => cmp,
608 },
609 cmp => cmp,
610 }
611 .filter(|cmp| *cmp != Ordering::Equal || self == other)
613 }
614}
615
616#[derive(Clone, PartialEq, Eq, Hash, Debug)]
629pub struct CreateFunction {
630 pub or_replace: bool,
631 pub temporary: bool,
632 pub name: String,
633 pub args: Option<Vec<OperateFunctionArg>>,
634 pub return_type: Option<DataType>,
635 pub params: CreateFunctionBody,
636 pub schema: DFSchemaRef,
638}
639
640impl PartialOrd for CreateFunction {
642 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
643 #[derive(PartialEq, PartialOrd)]
644 struct ComparableCreateFunction<'a> {
645 pub or_replace: &'a bool,
646 pub temporary: &'a bool,
647 pub name: &'a String,
648 pub args: &'a Option<Vec<OperateFunctionArg>>,
649 pub return_type: &'a Option<DataType>,
650 pub params: &'a CreateFunctionBody,
651 }
652 let comparable_self = ComparableCreateFunction {
653 or_replace: &self.or_replace,
654 temporary: &self.temporary,
655 name: &self.name,
656 args: &self.args,
657 return_type: &self.return_type,
658 params: &self.params,
659 };
660 let comparable_other = ComparableCreateFunction {
661 or_replace: &other.or_replace,
662 temporary: &other.temporary,
663 name: &other.name,
664 args: &other.args,
665 return_type: &other.return_type,
666 params: &other.params,
667 };
668 comparable_self
669 .partial_cmp(&comparable_other)
670 .filter(|cmp| *cmp != Ordering::Equal || self == other)
672 }
673}
674
675#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
679pub struct OperateFunctionArg {
680 pub name: Option<Ident>,
683 pub data_type: DataType,
684 pub default_expr: Option<Expr>,
685}
686
687impl<'a> TreeNodeContainer<'a, Expr> for OperateFunctionArg {
688 fn apply_elements<F: FnMut(&'a Expr) -> Result<TreeNodeRecursion>>(
689 &'a self,
690 f: F,
691 ) -> Result<TreeNodeRecursion> {
692 self.default_expr.apply_elements(f)
693 }
694
695 fn map_elements<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
696 self,
697 f: F,
698 ) -> Result<Transformed<Self>> {
699 self.default_expr.map_elements(f)?.map_data(|default_expr| {
700 Ok(Self {
701 default_expr,
702 ..self
703 })
704 })
705 }
706}
707
708#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
712pub struct CreateFunctionBody {
713 pub language: Option<Ident>,
715 pub behavior: Option<Volatility>,
717 pub function_body: Option<Expr>,
719}
720
721impl<'a> TreeNodeContainer<'a, Expr> for CreateFunctionBody {
722 fn apply_elements<F: FnMut(&'a Expr) -> Result<TreeNodeRecursion>>(
723 &'a self,
724 f: F,
725 ) -> Result<TreeNodeRecursion> {
726 self.function_body.apply_elements(f)
727 }
728
729 fn map_elements<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
730 self,
731 f: F,
732 ) -> Result<Transformed<Self>> {
733 self.function_body
734 .map_elements(f)?
735 .map_data(|function_body| {
736 Ok(Self {
737 function_body,
738 ..self
739 })
740 })
741 }
742}
743
744#[derive(Clone, PartialEq, Eq, Hash, Debug)]
745pub struct DropFunction {
746 pub name: String,
747 pub if_exists: bool,
748 pub schema: DFSchemaRef,
749}
750
751impl PartialOrd for DropFunction {
752 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
753 match self.name.partial_cmp(&other.name) {
754 Some(Ordering::Equal) => self.if_exists.partial_cmp(&other.if_exists),
755 cmp => cmp,
756 }
757 .filter(|cmp| *cmp != Ordering::Equal || self == other)
759 }
760}
761
762#[derive(Clone, PartialEq, Eq, Hash, Debug)]
763pub struct CreateIndex {
764 pub name: Option<String>,
765 pub table: TableReference,
766 pub using: Option<String>,
767 pub columns: Vec<SortExpr>,
768 pub unique: bool,
769 pub if_not_exists: bool,
770 pub schema: DFSchemaRef,
771}
772
773impl PartialOrd for CreateIndex {
775 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
776 #[derive(PartialEq, PartialOrd)]
777 struct ComparableCreateIndex<'a> {
778 pub name: &'a Option<String>,
779 pub table: &'a TableReference,
780 pub using: &'a Option<String>,
781 pub columns: &'a Vec<SortExpr>,
782 pub unique: &'a bool,
783 pub if_not_exists: &'a bool,
784 }
785 let comparable_self = ComparableCreateIndex {
786 name: &self.name,
787 table: &self.table,
788 using: &self.using,
789 columns: &self.columns,
790 unique: &self.unique,
791 if_not_exists: &self.if_not_exists,
792 };
793 let comparable_other = ComparableCreateIndex {
794 name: &other.name,
795 table: &other.table,
796 using: &other.using,
797 columns: &other.columns,
798 unique: &other.unique,
799 if_not_exists: &other.if_not_exists,
800 };
801 comparable_self
802 .partial_cmp(&comparable_other)
803 .filter(|cmp| *cmp != Ordering::Equal || self == other)
805 }
806}
807
808#[cfg(test)]
809mod test {
810 use crate::{CreateCatalog, DdlStatement, DropView};
811 use datafusion_common::{DFSchema, DFSchemaRef, TableReference};
812 use std::cmp::Ordering;
813
814 #[test]
815 fn test_partial_ord() {
816 let catalog = DdlStatement::CreateCatalog(CreateCatalog {
817 catalog_name: "name".to_string(),
818 if_not_exists: false,
819 schema: DFSchemaRef::new(DFSchema::empty()),
820 });
821 let catalog_2 = DdlStatement::CreateCatalog(CreateCatalog {
822 catalog_name: "name".to_string(),
823 if_not_exists: true,
824 schema: DFSchemaRef::new(DFSchema::empty()),
825 });
826
827 assert_eq!(catalog.partial_cmp(&catalog_2), Some(Ordering::Less));
828
829 let drop_view = DdlStatement::DropView(DropView {
830 name: TableReference::from("table"),
831 if_exists: false,
832 schema: DFSchemaRef::new(DFSchema::empty()),
833 });
834
835 assert_eq!(drop_view.partial_cmp(&catalog), Some(Ordering::Greater));
836 }
837}