use std::collections::HashMap;
use std::string::String;
use super::rel::*;
use super::sql::*;
use arrow::datatypes::*;
pub struct SqlToRel {
schemas: HashMap<String, Schema>,
}
impl SqlToRel {
pub fn new(schemas: HashMap<String, Schema>) -> Self {
SqlToRel {
schemas,
}
}
pub fn sql_to_rel(&self, sql: &ASTNode) -> Result<Box<LogicalPlan>, String> {
match sql {
&ASTNode::SQLSelect {
ref projection,
ref relation,
ref selection,
ref limit,
ref order_by,
ref group_by,
ref having,
..
} => {
let input = match relation {
&Some(ref r) => self.sql_to_rel(r)?,
&None => Box::new(LogicalPlan::EmptyRelation),
};
let input_schema = input.schema();
let expr: Vec<Expr> = projection
.iter()
.map(|e| self.sql_to_rex(&e, &input_schema))
.collect::<Result<Vec<Expr>, String>>()?;
let projection_schema = Schema {
columns: expr.iter()
.map(|e| match e {
&Expr::Column(i) => input_schema.columns[i].clone(),
&Expr::ScalarFunction { ref name, .. } => Field {
name: name.clone(),
data_type: DataType::Float64, nullable: true,
},
_ => unimplemented!(),
})
.collect(),
};
let selection_plan = match selection {
&Some(ref filter_expr) => {
let selection_rel = LogicalPlan::Selection {
expr: self.sql_to_rex(&filter_expr, &input_schema.clone())?,
input: input,
schema: input_schema.clone(),
};
LogicalPlan::Projection {
expr: expr,
input: Box::new(selection_rel),
schema: projection_schema.clone(),
}
}
_ => LogicalPlan::Projection {
expr: expr,
input: input,
schema: projection_schema.clone(),
},
};
if let &Some(_) = group_by {
return Err(String::from("GROUP BY is not implemented yet"));
}
if let &Some(_) = having {
return Err(String::from("HAVING is not implemented yet"));
}
let order_by_plan = match order_by {
&Some(ref order_by_expr) => {
let input_schema = selection_plan.schema();
let order_by_rex: Result<Vec<Expr>, String> = order_by_expr
.iter()
.map(|e| self.sql_to_rex(e, &input_schema))
.collect();
LogicalPlan::Sort {
expr: order_by_rex?,
input: Box::new(selection_plan),
schema: input_schema,
}
}
_ => selection_plan,
};
let limit_plan = match limit {
&Some(ref limit_ast_node) => {
let limit_count = match **limit_ast_node {
ASTNode::SQLLiteralLong(n) => n,
_ => return Err(String::from("LIMIT parameter is not a number")),
};
LogicalPlan::Limit {
limit: limit_count as usize,
schema: order_by_plan.schema(),
input: Box::new(order_by_plan),
}
}
_ => order_by_plan,
};
Ok(Box::new(limit_plan))
}
&ASTNode::SQLIdentifier(ref id) => match self.schemas.get(id) {
Some(schema) => Ok(Box::new(LogicalPlan::TableScan {
schema_name: String::from("default"),
table_name: id.clone(),
schema: schema.clone(),
})),
None => Err(format!("no schema found for table {}", id)),
},
_ => Err(format!(
"sql_to_rel does not support this relation: {:?}",
sql
)),
}
}
pub fn sql_to_rex(&self, sql: &ASTNode, schema: &Schema) -> Result<Expr, String> {
match sql {
&ASTNode::SQLLiteralLong(n) => Ok(Expr::Literal(ScalarValue::Int64(n))),
&ASTNode::SQLLiteralDouble(n) => Ok(Expr::Literal(ScalarValue::Float64(n))),
&ASTNode::SQLIdentifier(ref id) => {
match schema.columns.iter().position(|c| c.name.eq(id)) {
Some(index) => Ok(Expr::Column(index)),
None => Err(format!("Invalid identifier {}", id)),
}
}
&ASTNode::SQLBinaryExpr {
ref left,
ref op,
ref right,
} => {
let operator = match op {
&SQLOperator::Gt => Operator::Gt,
&SQLOperator::GtEq => Operator::GtEq,
&SQLOperator::Lt => Operator::Lt,
&SQLOperator::LtEq => Operator::LtEq,
&SQLOperator::Eq => Operator::Eq,
&SQLOperator::NotEq => Operator::NotEq,
&SQLOperator::Plus => Operator::Plus,
&SQLOperator::Minus => Operator::Minus,
&SQLOperator::Multiply => Operator::Multiply,
&SQLOperator::Divide => Operator::Divide,
&SQLOperator::Modulus => Operator::Modulus,
};
Ok(Expr::BinaryExpr {
left: Box::new(self.sql_to_rex(&left, &schema)?),
op: operator,
right: Box::new(self.sql_to_rex(&right, &schema)?),
})
}
&ASTNode::SQLOrderBy { ref expr, asc } => Ok(Expr::Sort {
expr: Box::new(self.sql_to_rex(&expr, &schema)?),
asc,
}),
&ASTNode::SQLFunction { ref id, ref args } => {
let rex_args = args.iter()
.map(|a| self.sql_to_rex(a, schema))
.collect::<Result<Vec<Expr>, String>>()?;
Ok(Expr::ScalarFunction {
name: id.clone(),
args: rex_args,
})
}
_ => Err(String::from(format!(
"Unsupported ast node {:?} in sqltorel",
sql
))),
}
}
}
pub fn convert_data_type(sql: &SQLType) -> DataType {
match sql {
&SQLType::Varchar(_) => DataType::Utf8,
&SQLType::Int => DataType::Int32,
&SQLType::Long => DataType::Int64,
&SQLType::Float => DataType::Float64,
&SQLType::Double => DataType::Float64,
}
}