use datafusion_common::{
parsers::CompressionTypeVariant, DFSchemaRef, OwnedTableReference,
};
use datafusion_common::{Column, OwnedSchemaReference};
use std::collections::HashMap;
use std::sync::Arc;
use std::{
fmt::{self, Display},
hash::{Hash, Hasher},
};
use crate::{Expr, LogicalPlan};
#[derive(Clone, PartialEq, Eq, Hash)]
pub enum DdlStatement {
CreateExternalTable(CreateExternalTable),
CreateMemoryTable(CreateMemoryTable),
CreateView(CreateView),
CreateCatalogSchema(CreateCatalogSchema),
CreateCatalog(CreateCatalog),
DropTable(DropTable),
DropView(DropView),
DropCatalogSchema(DropCatalogSchema),
}
impl DdlStatement {
pub fn schema(&self) -> &DFSchemaRef {
match self {
DdlStatement::CreateExternalTable(CreateExternalTable { schema, .. }) => {
schema
}
DdlStatement::CreateMemoryTable(CreateMemoryTable { input, .. })
| DdlStatement::CreateView(CreateView { input, .. }) => input.schema(),
DdlStatement::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
schema
}
DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema,
DdlStatement::DropTable(DropTable { schema, .. }) => schema,
DdlStatement::DropView(DropView { schema, .. }) => schema,
DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
}
}
pub fn name(&self) -> &str {
match self {
DdlStatement::CreateExternalTable(_) => "CreateExternalTable",
DdlStatement::CreateMemoryTable(_) => "CreateMemoryTable",
DdlStatement::CreateView(_) => "CreateView",
DdlStatement::CreateCatalogSchema(_) => "CreateCatalogSchema",
DdlStatement::CreateCatalog(_) => "CreateCatalog",
DdlStatement::DropTable(_) => "DropTable",
DdlStatement::DropView(_) => "DropView",
DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema",
}
}
pub fn inputs(&self) -> Vec<&LogicalPlan> {
match self {
DdlStatement::CreateExternalTable(_) => vec![],
DdlStatement::CreateCatalogSchema(_) => vec![],
DdlStatement::CreateCatalog(_) => vec![],
DdlStatement::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
vec![input]
}
DdlStatement::CreateView(CreateView { input, .. }) => vec![input],
DdlStatement::DropTable(_) => vec![],
DdlStatement::DropView(_) => vec![],
DdlStatement::DropCatalogSchema(_) => vec![],
}
}
pub fn display(&self) -> impl fmt::Display + '_ {
struct Wrapper<'a>(&'a DdlStatement);
impl<'a> Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.0 {
DdlStatement::CreateExternalTable(CreateExternalTable {
ref name,
..
}) => {
write!(f, "CreateExternalTable: {name:?}")
}
DdlStatement::CreateMemoryTable(CreateMemoryTable {
name,
primary_key,
..
}) => {
let pk: Vec<String> =
primary_key.iter().map(|c| c.name.to_string()).collect();
let mut pk = pk.join(", ");
if !pk.is_empty() {
pk = format!(" primary_key=[{pk}]");
}
write!(f, "CreateMemoryTable: {name:?}{pk}")
}
DdlStatement::CreateView(CreateView { name, .. }) => {
write!(f, "CreateView: {name:?}")
}
DdlStatement::CreateCatalogSchema(CreateCatalogSchema {
schema_name,
..
}) => {
write!(f, "CreateCatalogSchema: {schema_name:?}")
}
DdlStatement::CreateCatalog(CreateCatalog {
catalog_name, ..
}) => {
write!(f, "CreateCatalog: {catalog_name:?}")
}
DdlStatement::DropTable(DropTable {
name, if_exists, ..
}) => {
write!(f, "DropTable: {name:?} if not exist:={if_exists}")
}
DdlStatement::DropView(DropView {
name, if_exists, ..
}) => {
write!(f, "DropView: {name:?} if not exist:={if_exists}")
}
DdlStatement::DropCatalogSchema(DropCatalogSchema {
name,
if_exists,
cascade,
..
}) => {
write!(f, "DropCatalogSchema: {name:?} if not exist:={if_exists} cascade:={cascade}")
}
}
}
}
Wrapper(self)
}
}
#[derive(Clone, PartialEq, Eq)]
pub struct CreateExternalTable {
pub schema: DFSchemaRef,
pub name: OwnedTableReference,
pub location: String,
pub file_type: String,
pub has_header: bool,
pub delimiter: char,
pub table_partition_cols: Vec<String>,
pub if_not_exists: bool,
pub definition: Option<String>,
pub order_exprs: Vec<Vec<Expr>>,
pub file_compression_type: CompressionTypeVariant,
pub unbounded: bool,
pub options: HashMap<String, String>,
}
impl Hash for CreateExternalTable {
fn hash<H: Hasher>(&self, state: &mut H) {
self.schema.hash(state);
self.name.hash(state);
self.location.hash(state);
self.file_type.hash(state);
self.has_header.hash(state);
self.delimiter.hash(state);
self.table_partition_cols.hash(state);
self.if_not_exists.hash(state);
self.definition.hash(state);
self.file_compression_type.hash(state);
self.order_exprs.hash(state);
self.unbounded.hash(state);
self.options.len().hash(state); }
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct CreateMemoryTable {
pub name: OwnedTableReference,
pub primary_key: Vec<Column>,
pub input: Arc<LogicalPlan>,
pub if_not_exists: bool,
pub or_replace: bool,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct CreateView {
pub name: OwnedTableReference,
pub input: Arc<LogicalPlan>,
pub or_replace: bool,
pub definition: Option<String>,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct CreateCatalog {
pub catalog_name: String,
pub if_not_exists: bool,
pub schema: DFSchemaRef,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct CreateCatalogSchema {
pub schema_name: String,
pub if_not_exists: bool,
pub schema: DFSchemaRef,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DropTable {
pub name: OwnedTableReference,
pub if_exists: bool,
pub schema: DFSchemaRef,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DropView {
pub name: OwnedTableReference,
pub if_exists: bool,
pub schema: DFSchemaRef,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DropCatalogSchema {
pub name: OwnedSchemaReference,
pub if_exists: bool,
pub cascade: bool,
pub schema: DFSchemaRef,
}