use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{DFSchemaRef, TableReference};
use crate::{LogicalPlan, TableSource};
#[derive(Clone)]
pub struct CopyTo {
pub input: Arc<LogicalPlan>,
pub output_url: String,
pub partition_by: Vec<String>,
pub file_type: Arc<dyn FileType>,
pub options: HashMap<String, String>,
pub output_schema: DFSchemaRef,
}
impl Debug for CopyTo {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("CopyTo")
.field("input", &self.input)
.field("output_url", &self.output_url)
.field("partition_by", &self.partition_by)
.field("file_type", &"...")
.field("options", &self.options)
.field("output_schema", &self.output_schema)
.finish_non_exhaustive()
}
}
impl PartialEq for CopyTo {
fn eq(&self, other: &Self) -> bool {
self.input == other.input && self.output_url == other.output_url
}
}
impl Eq for CopyTo {}
impl PartialOrd for CopyTo {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.input.partial_cmp(&other.input) {
Some(Ordering::Equal) => match self.output_url.partial_cmp(&other.output_url)
{
Some(Ordering::Equal) => {
self.partition_by.partial_cmp(&other.partition_by)
}
cmp => cmp,
},
cmp => cmp,
}
.filter(|cmp| *cmp != Ordering::Equal || self == other)
}
}
impl Hash for CopyTo {
fn hash<H: Hasher>(&self, state: &mut H) {
self.input.hash(state);
self.output_url.hash(state);
}
}
impl CopyTo {
pub fn new(
input: Arc<LogicalPlan>,
output_url: String,
partition_by: Vec<String>,
file_type: Arc<dyn FileType>,
options: HashMap<String, String>,
) -> Self {
Self {
input,
output_url,
partition_by,
file_type,
options,
output_schema: make_count_schema(),
}
}
}
#[derive(Clone)]
pub struct DmlStatement {
pub table_name: TableReference,
pub target: Arc<dyn TableSource>,
pub op: WriteOp,
pub input: Arc<LogicalPlan>,
pub output_schema: DFSchemaRef,
}
impl Eq for DmlStatement {}
impl Hash for DmlStatement {
fn hash<H: Hasher>(&self, state: &mut H) {
self.table_name.hash(state);
self.target.schema().hash(state);
self.op.hash(state);
self.input.hash(state);
self.output_schema.hash(state);
}
}
impl PartialEq for DmlStatement {
fn eq(&self, other: &Self) -> bool {
self.table_name == other.table_name
&& self.target.schema() == other.target.schema()
&& self.op == other.op
&& self.input == other.input
&& self.output_schema == other.output_schema
}
}
impl Debug for DmlStatement {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("DmlStatement")
.field("table_name", &self.table_name)
.field("target", &"...")
.field("target_schema", &self.target.schema())
.field("op", &self.op)
.field("input", &self.input)
.field("output_schema", &self.output_schema)
.finish()
}
}
impl DmlStatement {
pub fn new(
table_name: TableReference,
target: Arc<dyn TableSource>,
op: WriteOp,
input: Arc<LogicalPlan>,
) -> Self {
Self {
table_name,
target,
op,
input,
output_schema: make_count_schema(),
}
}
pub fn name(&self) -> &str {
self.op.name()
}
}
impl PartialOrd for DmlStatement {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.table_name.partial_cmp(&other.table_name) {
Some(Ordering::Equal) => match self.op.partial_cmp(&other.op) {
Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
cmp => cmp,
},
cmp => cmp,
}
.filter(|cmp| *cmp != Ordering::Equal || self == other)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum WriteOp {
Insert(InsertOp),
Delete,
Update,
Ctas,
Truncate,
}
impl WriteOp {
pub fn name(&self) -> &str {
match self {
WriteOp::Insert(insert) => insert.name(),
WriteOp::Delete => "Delete",
WriteOp::Update => "Update",
WriteOp::Ctas => "Ctas",
WriteOp::Truncate => "Truncate",
}
}
}
impl Display for WriteOp {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
pub enum InsertOp {
Append,
Overwrite,
Replace,
}
impl InsertOp {
pub fn name(&self) -> &str {
match self {
InsertOp::Append => "Insert Into",
InsertOp::Overwrite => "Insert Overwrite",
InsertOp::Replace => "Replace Into",
}
}
}
impl Display for InsertOp {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name())
}
}
fn make_count_schema() -> DFSchemaRef {
Arc::new(
Schema::new(vec![Field::new("count", DataType::UInt64, false)])
.try_into()
.unwrap(),
)
}