use std::fmt;
use std::fmt::{Error, Formatter};
use std::rc::Rc;
use super::types::*;
use arrow::datatypes::*;
#[derive(Debug, Clone)]
pub enum FunctionType {
Scalar,
Aggregate,
}
#[derive(Debug, Clone)]
pub struct FunctionMeta {
pub name: String,
pub args: Vec<Field>,
pub return_type: DataType,
pub function_type: FunctionType,
}
pub trait Row {
fn get(&self, index: usize) -> &ScalarValue;
fn to_string(&self) -> String;
}
impl Row for Vec<ScalarValue> {
fn get(&self, index: usize) -> &ScalarValue {
&self[index]
}
fn to_string(&self) -> String {
let value_strings: Vec<String> = self.iter().map(|v| v.to_string()).collect();
value_strings.join(",")
}
}
#[derive(Debug, Clone)]
pub enum Operator {
Eq,
NotEq,
Lt,
LtEq,
Gt,
GtEq,
Plus,
Minus,
Multiply,
Divide,
Modulus,
And,
Or,
}
#[derive(Clone)]
pub enum Expr {
Column(usize),
Literal(ScalarValue),
BinaryExpr {
left: Rc<Expr>,
op: Operator,
right: Rc<Expr>,
},
Cast {
expr: Rc<Expr>,
data_type: DataType
},
Sort { expr: Rc<Expr>, asc: bool },
ScalarFunction { name: String, args: Vec<Expr> },
AggregateFunction { name: String, args: Vec<Expr> },
}
impl fmt::Debug for Expr {
fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
match self {
Expr::Column(i) =>
write!(f, "#{}", i),
Expr::Literal(v) =>
write!(f, "{:?}", v),
Expr::Cast { expr, data_type } =>
write!(f, "CAST {:?} AS {:?}", expr, data_type),
Expr::BinaryExpr { left, op, right } =>
write!(f, "{:?} {:?} {:?}", left, op, right),
Expr::Sort { expr, asc } => if *asc {
write!(f, "{:?} ASC", expr)
} else {
write!(f, "{:?} DESC", expr)
},
Expr::ScalarFunction { name, .. } =>
write!(f, "{}()", name),
Expr::AggregateFunction { name, .. } =>
write!(f, "{}()", name),
}
}
}
impl Expr {
pub fn eq(&self, other: &Expr) -> Expr {
Expr::BinaryExpr {
left: Rc::new(self.clone()),
op: Operator::Eq,
right: Rc::new(other.clone()),
}
}
pub fn not_eq(&self, other: &Expr) -> Expr {
Expr::BinaryExpr {
left: Rc::new(self.clone()),
op: Operator::NotEq,
right: Rc::new(other.clone()),
}
}
pub fn gt(&self, other: &Expr) -> Expr {
Expr::BinaryExpr {
left: Rc::new(self.clone()),
op: Operator::Gt,
right: Rc::new(other.clone()),
}
}
pub fn gt_eq(&self, other: &Expr) -> Expr {
Expr::BinaryExpr {
left: Rc::new(self.clone()),
op: Operator::GtEq,
right: Rc::new(other.clone()),
}
}
pub fn lt(&self, other: &Expr) -> Expr {
Expr::BinaryExpr {
left: Rc::new(self.clone()),
op: Operator::Lt,
right: Rc::new(other.clone()),
}
}
pub fn lt_eq(&self, other: &Expr) -> Expr {
Expr::BinaryExpr {
left: Rc::new(self.clone()),
op: Operator::LtEq,
right: Rc::new(other.clone()),
}
}
}
#[derive(Clone)]
pub enum LogicalPlan {
Limit {
limit: usize,
input: Rc<LogicalPlan>,
schema: Rc<Schema>,
},
Projection {
expr: Vec<Expr>,
input: Rc<LogicalPlan>,
schema: Rc<Schema>,
},
Selection {
expr: Expr,
input: Rc<LogicalPlan>,
schema: Rc<Schema>,
},
Aggregate {
input: Rc<LogicalPlan>,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
schema: Rc<Schema>,
},
Sort {
expr: Vec<Expr>,
input: Rc<LogicalPlan>,
schema: Rc<Schema>,
},
TableScan {
schema_name: String,
table_name: String,
schema: Rc<Schema>,
projection: Option<Vec<usize>>
},
CsvFile {
filename: String,
schema: Rc<Schema>,
has_header: bool,
projection: Option<Vec<usize>>
},
ParquetFile {
filename: String,
schema: Rc<Schema>,
},
EmptyRelation { schema: Rc<Schema> },
}
impl LogicalPlan {
pub fn schema(&self) -> &Rc<Schema> {
match self {
LogicalPlan::EmptyRelation { schema } => &schema,
LogicalPlan::TableScan { schema, .. } => &schema,
LogicalPlan::CsvFile { schema, .. } => &schema,
LogicalPlan::ParquetFile { schema, .. } => &schema,
LogicalPlan::Projection { schema, .. } => &schema,
LogicalPlan::Selection { schema, .. } => &schema,
LogicalPlan::Aggregate { schema, .. } => &schema,
LogicalPlan::Sort { schema, .. } => &schema,
LogicalPlan::Limit { schema, .. } => &schema,
}
}
}
impl LogicalPlan {
fn fmt_with_indent(&self, f: &mut Formatter, indent: usize) -> Result<(), Error> {
write!(f, "\n")?;
for _ in 0..indent {
write!(f, " ")?;
}
match *self {
LogicalPlan::EmptyRelation { .. } => {
write!(f, "EmptyRelation:")
}
LogicalPlan::TableScan { ref table_name, ref projection, .. } => {
write!(f, "TableScan: {} projection={:?}", table_name, projection)
}
LogicalPlan::CsvFile { ref filename, ref schema, .. } => {
write!(f, "CsvFile: file={}, schema={:?}", filename, schema)
}
LogicalPlan::ParquetFile { .. } => {
write!(f, "ParquetFile:")
}
LogicalPlan::Projection { ref input, .. } => {
write!(f, "Projection:")?;
input.fmt_with_indent(f, indent + 1)
}
LogicalPlan::Selection { ref expr, ref input, .. } => {
write!(f, "Selection: {:?}", expr)?;
input.fmt_with_indent(f, indent + 1)
}
LogicalPlan::Aggregate { ref input, ref group_expr, ref aggr_expr, .. } => {
write!(f, "Aggregate: groupBy=[{:?}], aggr=[{:?}]", group_expr, aggr_expr)?;
input.fmt_with_indent(f, indent + 1)
}
LogicalPlan::Sort { ref input, .. } => {
write!(f, "Sort:")?;
input.fmt_with_indent(f, indent + 1)
}
LogicalPlan::Limit { ref input, .. } => {
write!(f, "Limit:")?;
input.fmt_with_indent(f, indent + 1)
}
}
}
}
impl fmt::Debug for LogicalPlan {
fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
self.fmt_with_indent(f, 0)
}
}