use std::str::FromStr;
use std::sync::Arc;
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
lit, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, StringifiedPlan,
};
use crate::scalar::ScalarValue;
use crate::{
error::{ExecutionError, Result},
physical_plan::udaf::AggregateUDF,
};
use crate::{
physical_plan::udf::ScalarUDF,
physical_plan::{aggregates, functions},
sql::parser::{CreateExternalTable, FileType, Statement as DFStatement},
};
use arrow::datatypes::*;
use super::parser::ExplainPlan;
use sqlparser::ast::{
BinaryOperator, DataType as SQLDataType, Expr as SQLExpr, Query, Select, SelectItem,
SetExpr, TableFactor, TableWithJoins, UnaryOperator, Value,
};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{OrderByExpr, Statement};
pub trait SchemaProvider {
fn get_table_meta(&self, name: &str) -> Option<SchemaRef>;
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>;
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>>;
}
pub struct SqlToRel<'a, S: SchemaProvider> {
schema_provider: &'a S,
}
impl<'a, S: SchemaProvider> SqlToRel<'a, S> {
pub fn new(schema_provider: &'a S) -> Self {
SqlToRel { schema_provider }
}
pub fn statement_to_plan(&self, statement: &DFStatement) -> Result<LogicalPlan> {
match statement {
DFStatement::CreateExternalTable(s) => self.external_table_to_plan(&s),
DFStatement::Statement(s) => self.sql_statement_to_plan(&s),
DFStatement::Explain(s) => self.explain_statement_to_plan(&(*s)),
}
}
pub fn sql_statement_to_plan(&self, sql: &Statement) -> Result<LogicalPlan> {
match sql {
Statement::Query(query) => self.query_to_plan(&query),
_ => Err(ExecutionError::NotImplemented(
"Only SELECT statements are implemented".to_string(),
)),
}
}
pub fn query_to_plan(&self, query: &Query) -> Result<LogicalPlan> {
let plan = match &query.body {
SetExpr::Select(s) => self.select_to_plan(s.as_ref()),
_ => Err(ExecutionError::NotImplemented(
format!("Query {} not implemented yet", query.body).to_owned(),
)),
}?;
let plan = self.order_by(&plan, &query.order_by)?;
self.limit(&plan, &query.limit)
}
pub fn external_table_to_plan(
&self,
statement: &CreateExternalTable,
) -> Result<LogicalPlan> {
let CreateExternalTable {
name,
columns,
file_type,
has_header,
location,
} = statement;
match *file_type {
FileType::CSV => {
if columns.is_empty() {
return Err(ExecutionError::General(
"Column definitions required for CSV files. None found".into(),
));
}
}
FileType::Parquet => {
if !columns.is_empty() {
return Err(ExecutionError::General(
"Column definitions can not be specified for PARQUET files."
.into(),
));
}
}
FileType::NdJson => {}
};
let schema = SchemaRef::new(self.build_schema(&columns)?);
Ok(LogicalPlan::CreateExternalTable {
schema,
name: name.clone(),
location: location.clone(),
file_type: file_type.clone(),
has_header: has_header.clone(),
})
}
pub fn explain_statement_to_plan(
&self,
explain_plan: &ExplainPlan,
) -> Result<LogicalPlan> {
let verbose = explain_plan.verbose;
let plan = self.statement_to_plan(&explain_plan.statement)?;
let stringified_plans = vec![StringifiedPlan::new(
PlanType::LogicalPlan,
format!("{:#?}", plan),
)];
let schema = LogicalPlan::explain_schema();
let plan = Arc::new(plan);
Ok(LogicalPlan::Explain {
verbose,
plan,
stringified_plans,
schema,
})
}
fn build_schema(&self, columns: &Vec<SQLColumnDef>) -> Result<Schema> {
let mut fields = Vec::new();
for column in columns {
let data_type = self.make_data_type(&column.data_type)?;
let allow_null = column
.options
.iter()
.any(|x| x.option == ColumnOption::Null);
fields.push(Field::new(&column.name.value, data_type, allow_null));
}
Ok(Schema::new(fields))
}
fn make_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
match sql_type {
SQLDataType::BigInt => Ok(DataType::Int64),
SQLDataType::Int => Ok(DataType::Int32),
SQLDataType::SmallInt => Ok(DataType::Int16),
SQLDataType::Char(_) | SQLDataType::Varchar(_) | SQLDataType::Text => {
Ok(DataType::Utf8)
}
SQLDataType::Decimal(_, _) => Ok(DataType::Float64),
SQLDataType::Float(_) => Ok(DataType::Float32),
SQLDataType::Real | SQLDataType::Double => Ok(DataType::Float64),
SQLDataType::Boolean => Ok(DataType::Boolean),
SQLDataType::Date => Ok(DataType::Date64(DateUnit::Day)),
SQLDataType::Time => Ok(DataType::Time64(TimeUnit::Millisecond)),
SQLDataType::Timestamp => Ok(DataType::Date64(DateUnit::Millisecond)),
_ => Err(ExecutionError::General(format!(
"Unsupported data type: {:?}.",
sql_type
))),
}
}
fn from_join_to_plan(&self, from: &Vec<TableWithJoins>) -> Result<LogicalPlan> {
if from.len() == 0 {
return Ok(LogicalPlanBuilder::empty().build()?);
}
if from.len() != 1 {
return Err(ExecutionError::NotImplemented(
"FROM with multiple tables is still not implemented".to_string(),
));
};
let relation = &from[0].relation;
match relation {
TableFactor::Table { name, .. } => {
let name = name.to_string();
match self.schema_provider.get_table_meta(&name) {
Some(schema) => Ok(LogicalPlanBuilder::scan(
"default",
&name,
schema.as_ref(),
None,
)?
.build()?),
None => Err(ExecutionError::General(format!(
"no schema found for table {}",
name
))),
}
}
_ => Err(ExecutionError::NotImplemented(
"Subqueries are still not supported".to_string(),
)),
}
}
fn select_to_plan(&self, select: &Select) -> Result<LogicalPlan> {
if select.having.is_some() {
return Err(ExecutionError::NotImplemented(
"HAVING is not implemented yet".to_string(),
));
}
let plan = self.from_join_to_plan(&select.from)?;
let plan = self.filter(&plan, &select.selection)?;
let projection_expr: Vec<Expr> = select
.projection
.iter()
.map(|e| self.sql_select_to_rex(&e, &plan.schema()))
.collect::<Result<Vec<Expr>>>()?;
let aggr_expr: Vec<Expr> = projection_expr
.iter()
.filter(|e| is_aggregate_expr(e))
.map(|e| e.clone())
.collect();
let plan = if (select.group_by.len() > 0) | (aggr_expr.len() > 0) {
self.aggregate(&plan, projection_expr, &select.group_by, aggr_expr)?
} else {
self.project(&plan, projection_expr)?
};
Ok(plan)
}
fn filter(
&self,
plan: &LogicalPlan,
predicate: &Option<SQLExpr>,
) -> Result<LogicalPlan> {
match *predicate {
Some(ref predicate_expr) => LogicalPlanBuilder::from(&plan)
.filter(self.sql_to_rex(predicate_expr, &plan.schema())?)?
.build(),
_ => Ok(plan.clone()),
}
}
fn project(&self, input: &LogicalPlan, expr: Vec<Expr>) -> Result<LogicalPlan> {
LogicalPlanBuilder::from(input).project(expr)?.build()
}
fn aggregate(
&self,
input: &LogicalPlan,
projection_expr: Vec<Expr>,
group_by: &Vec<SQLExpr>,
aggr_expr: Vec<Expr>,
) -> Result<LogicalPlan> {
let group_expr: Vec<Expr> = group_by
.iter()
.map(|e| self.sql_to_rex(&e, &input.schema()))
.collect::<Result<Vec<Expr>>>()?;
let group_by_count = group_expr.len();
let aggr_count = aggr_expr.len();
if group_by_count + aggr_count != projection_expr.len() {
return Err(ExecutionError::General(
"Projection references non-aggregate values".to_owned(),
));
}
let plan = LogicalPlanBuilder::from(&input)
.aggregate(group_expr, aggr_expr)?
.build()?;
let expected_columns: Vec<String> = projection_expr
.iter()
.map(|e| e.name(input.schema()))
.collect::<Result<Vec<_>>>()?;
let columns: Vec<String> = plan
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect::<Vec<_>>();
if expected_columns != columns {
self.project(
&plan,
expected_columns
.iter()
.map(|c| Expr::Column(c.clone()))
.collect(),
)
} else {
Ok(plan)
}
}
fn limit(&self, input: &LogicalPlan, limit: &Option<SQLExpr>) -> Result<LogicalPlan> {
match *limit {
Some(ref limit_expr) => {
let n = match self.sql_to_rex(&limit_expr, &input.schema())? {
Expr::Literal(ScalarValue::Int64(Some(n))) => Ok(n as usize),
_ => Err(ExecutionError::General(
"Unexpected expression for LIMIT clause".to_string(),
)),
}?;
LogicalPlanBuilder::from(&input).limit(n)?.build()
}
_ => Ok(input.clone()),
}
}
fn order_by(
&self,
plan: &LogicalPlan,
order_by: &Vec<OrderByExpr>,
) -> Result<LogicalPlan> {
if order_by.len() == 0 {
return Ok(plan.clone());
}
let input_schema = plan.schema();
let order_by_rex: Result<Vec<Expr>> = order_by
.iter()
.map(|e| {
Ok(Expr::Sort {
expr: Box::new(self.sql_to_rex(&e.expr, &input_schema).unwrap()),
asc: e.asc.unwrap_or(true),
nulls_first: e.nulls_first.unwrap_or(true),
})
})
.collect();
LogicalPlanBuilder::from(&plan).sort(order_by_rex?)?.build()
}
fn sql_select_to_rex(&self, sql: &SelectItem, schema: &Schema) -> Result<Expr> {
match sql {
SelectItem::UnnamedExpr(expr) => self.sql_to_rex(expr, schema),
SelectItem::ExprWithAlias { expr, alias } => Ok(Alias(
Box::new(self.sql_to_rex(&expr, schema)?),
alias.value.clone(),
)),
SelectItem::Wildcard => Ok(Expr::Wildcard),
SelectItem::QualifiedWildcard(_) => Err(ExecutionError::NotImplemented(
"Qualified wildcards are not supported".to_string(),
)),
}
}
pub fn sql_to_rex(&self, sql: &SQLExpr, schema: &Schema) -> Result<Expr> {
match sql {
SQLExpr::Value(Value::Number(n)) => match n.parse::<i64>() {
Ok(n) => Ok(lit(n)),
Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
},
SQLExpr::Value(Value::SingleQuotedString(ref s)) => Ok(lit(s.clone())),
SQLExpr::Identifier(ref id) => {
if &id.value[0..1] == "@" {
let var_names = vec![id.value.clone()];
Ok(Expr::ScalarVariable(var_names))
} else {
match schema.field_with_name(&id.value) {
Ok(field) => Ok(Expr::Column(field.name().clone())),
Err(_) => Err(ExecutionError::ExecutionError(format!(
"Invalid identifier '{}' for schema {}",
id,
schema.to_string()
))),
}
}
}
SQLExpr::CompoundIdentifier(ids) => {
let mut var_names = vec![];
for i in 0..ids.len() {
let id = ids[i].clone();
var_names.push(id.value);
}
if &var_names[0][0..1] == "@" {
Ok(Expr::ScalarVariable(var_names))
} else {
Err(ExecutionError::ExecutionError(format!(
"Invalid compound identifier '{:?}' for schema {}",
var_names,
schema.to_string()
)))
}
}
SQLExpr::Wildcard => Ok(Expr::Wildcard),
SQLExpr::Cast {
ref expr,
ref data_type,
} => Ok(Expr::Cast {
expr: Box::new(self.sql_to_rex(&expr, schema)?),
data_type: convert_data_type(data_type)?,
}),
SQLExpr::IsNull(ref expr) => {
Ok(Expr::IsNull(Box::new(self.sql_to_rex(expr, schema)?)))
}
SQLExpr::IsNotNull(ref expr) => {
Ok(Expr::IsNotNull(Box::new(self.sql_to_rex(expr, schema)?)))
}
SQLExpr::UnaryOp { ref op, ref expr } => match *op {
UnaryOperator::Not => {
Ok(Expr::Not(Box::new(self.sql_to_rex(expr, schema)?)))
}
_ => Err(ExecutionError::InternalError(format!(
"SQL binary operator cannot be interpreted as a unary operator"
))),
},
SQLExpr::BinaryOp {
ref left,
ref op,
ref right,
} => {
let operator = match *op {
BinaryOperator::Gt => Ok(Operator::Gt),
BinaryOperator::GtEq => Ok(Operator::GtEq),
BinaryOperator::Lt => Ok(Operator::Lt),
BinaryOperator::LtEq => Ok(Operator::LtEq),
BinaryOperator::Eq => Ok(Operator::Eq),
BinaryOperator::NotEq => Ok(Operator::NotEq),
BinaryOperator::Plus => Ok(Operator::Plus),
BinaryOperator::Minus => Ok(Operator::Minus),
BinaryOperator::Multiply => Ok(Operator::Multiply),
BinaryOperator::Divide => Ok(Operator::Divide),
BinaryOperator::Modulus => Ok(Operator::Modulus),
BinaryOperator::And => Ok(Operator::And),
BinaryOperator::Or => Ok(Operator::Or),
BinaryOperator::Like => Ok(Operator::Like),
BinaryOperator::NotLike => Ok(Operator::NotLike),
_ => Err(ExecutionError::NotImplemented(format!(
"Unsupported SQL binary operator {:?}",
op
))),
}?;
Ok(Expr::BinaryExpr {
left: Box::new(self.sql_to_rex(&left, &schema)?),
op: operator,
right: Box::new(self.sql_to_rex(&right, &schema)?),
})
}
SQLExpr::Function(function) => {
let name: String = function.name.to_string();
if let Ok(fun) = functions::BuiltinScalarFunction::from_str(&name) {
let args = function
.args
.iter()
.map(|a| self.sql_to_rex(a, schema))
.collect::<Result<Vec<Expr>>>()?;
return Ok(Expr::ScalarFunction { fun, args });
};
if let Ok(fun) = aggregates::AggregateFunction::from_str(&name) {
let args = if fun == aggregates::AggregateFunction::Count {
function
.args
.iter()
.map(|a| match a {
SQLExpr::Value(Value::Number(_)) => Ok(lit(1_u8)),
SQLExpr::Wildcard => Ok(lit(1_u8)),
_ => self.sql_to_rex(a, schema),
})
.collect::<Result<Vec<Expr>>>()?
} else {
function
.args
.iter()
.map(|a| self.sql_to_rex(a, schema))
.collect::<Result<Vec<Expr>>>()?
};
return Ok(Expr::AggregateFunction {
fun,
distinct: function.distinct,
args,
});
};
match self.schema_provider.get_function_meta(&name) {
Some(fm) => {
let args = function
.args
.iter()
.map(|a| self.sql_to_rex(a, schema))
.collect::<Result<Vec<Expr>>>()?;
Ok(Expr::ScalarUDF {
fun: fm.clone(),
args,
})
}
None => match self.schema_provider.get_aggregate_meta(&name) {
Some(fm) => {
let args = function
.args
.iter()
.map(|a| self.sql_to_rex(a, schema))
.collect::<Result<Vec<Expr>>>()?;
Ok(Expr::AggregateUDF {
fun: fm.clone(),
args,
})
}
_ => Err(ExecutionError::General(format!(
"Invalid function '{}'",
name
))),
},
}
}
SQLExpr::Nested(e) => self.sql_to_rex(&e, &schema),
_ => Err(ExecutionError::General(format!(
"Unsupported ast node {:?} in sqltorel",
sql
))),
}
}
}
fn is_aggregate_expr(e: &Expr) -> bool {
match e {
Expr::AggregateFunction { .. } | Expr::AggregateUDF { .. } => true,
Expr::Alias(expr, _) => is_aggregate_expr(expr),
_ => false,
}
}
pub fn convert_data_type(sql: &SQLDataType) -> Result<DataType> {
match sql {
SQLDataType::Boolean => Ok(DataType::Boolean),
SQLDataType::SmallInt => Ok(DataType::Int16),
SQLDataType::Int => Ok(DataType::Int32),
SQLDataType::BigInt => Ok(DataType::Int64),
SQLDataType::Float(_) | SQLDataType::Real => Ok(DataType::Float64),
SQLDataType::Double => Ok(DataType::Float64),
SQLDataType::Char(_) | SQLDataType::Varchar(_) => Ok(DataType::Utf8),
SQLDataType::Timestamp => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
other => Err(ExecutionError::NotImplemented(format!(
"Unsupported SQL type {:?}",
other
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{logical_plan::create_udf, sql::parser::DFParser};
use functions::ScalarFunctionImplementation;
#[test]
fn select_no_relation() {
quick_test(
"SELECT 1",
"Projection: Int64(1)\
\n EmptyRelation",
);
}
#[test]
fn select_scalar_func_with_literal_no_relation() {
quick_test(
"SELECT sqrt(9)",
"Projection: sqrt(Int64(9))\
\n EmptyRelation",
);
}
#[test]
fn select_simple_filter() {
let sql = "SELECT id, first_name, last_name \
FROM person WHERE state = 'CO'";
let expected = "Projection: #id, #first_name, #last_name\
\n Filter: #state Eq Utf8(\"CO\")\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_neg_filter() {
let sql = "SELECT id, first_name, last_name \
FROM person WHERE NOT state";
let expected = "Projection: #id, #first_name, #last_name\
\n Filter: NOT #state\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_compound_filter() {
let sql = "SELECT id, first_name, last_name \
FROM person WHERE state = 'CO' AND age >= 21 AND age <= 65";
let expected = "Projection: #id, #first_name, #last_name\
\n Filter: #state Eq Utf8(\"CO\") And #age GtEq Int64(21) And #age LtEq Int64(65)\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn test_timestamp_filter() {
let sql = "SELECT state FROM person WHERE birth_date < CAST (158412331400600000 as timestamp)";
let expected = "Projection: #state\
\n Filter: #birth_date Lt CAST(Int64(158412331400600000) AS Timestamp(Nanosecond, None))\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_all_boolean_operators() {
let sql = "SELECT age, first_name, last_name \
FROM person \
WHERE age = 21 \
AND age != 21 \
AND age > 21 \
AND age >= 21 \
AND age < 65 \
AND age <= 65";
let expected = "Projection: #age, #first_name, #last_name\
\n Filter: #age Eq Int64(21) \
And #age NotEq Int64(21) \
And #age Gt Int64(21) \
And #age GtEq Int64(21) \
And #age Lt Int64(65) \
And #age LtEq Int64(65)\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_binary_expr() {
let sql = "SELECT age + salary from person";
let expected = "Projection: #age Plus #salary\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_binary_expr_nested() {
let sql = "SELECT (age + salary)/2 from person";
let expected = "Projection: #age Plus #salary Divide Int64(2)\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_simple_aggregate() {
quick_test(
"SELECT MIN(age) FROM person",
"Aggregate: groupBy=[[]], aggr=[[MIN(#age)]]\
\n TableScan: person projection=None",
);
}
#[test]
fn test_sum_aggregate() {
quick_test(
"SELECT SUM(age) from person",
"Aggregate: groupBy=[[]], aggr=[[SUM(#age)]]\
\n TableScan: person projection=None",
);
}
#[test]
fn select_simple_aggregate_with_groupby() {
quick_test(
"SELECT state, MIN(age), MAX(age) FROM person GROUP BY state",
"Aggregate: groupBy=[[#state]], aggr=[[MIN(#age), MAX(#age)]]\
\n TableScan: person projection=None",
);
}
#[test]
fn test_wildcard() {
quick_test(
"SELECT * from person",
"Projection: #id, #first_name, #last_name, #age, #state, #salary, #birth_date\
\n TableScan: person projection=None",
);
}
#[test]
fn select_count_one() {
let sql = "SELECT COUNT(1) FROM person";
let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_count_column() {
let sql = "SELECT COUNT(id) FROM person";
let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(#id)]]\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_scalar_func() {
let sql = "SELECT sqrt(age) FROM person";
let expected = "Projection: sqrt(#age)\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_aliased_scalar_func() {
let sql = "SELECT sqrt(age) AS square_people FROM person";
let expected = "Projection: sqrt(#age) AS square_people\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_order_by() {
let sql = "SELECT id FROM person ORDER BY id";
let expected = "Sort: #id ASC NULLS FIRST\
\n Projection: #id\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_order_by_desc() {
let sql = "SELECT id FROM person ORDER BY id DESC";
let expected = "Sort: #id DESC NULLS FIRST\
\n Projection: #id\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_order_by_nulls_last() {
quick_test(
"SELECT id FROM person ORDER BY id DESC NULLS LAST",
"Sort: #id DESC NULLS LAST\
\n Projection: #id\
\n TableScan: person projection=None",
);
quick_test(
"SELECT id FROM person ORDER BY id NULLS LAST",
"Sort: #id ASC NULLS LAST\
\n Projection: #id\
\n TableScan: person projection=None",
);
}
#[test]
fn select_group_by() {
let sql = "SELECT state FROM person GROUP BY state";
let expected = "Aggregate: groupBy=[[#state]], aggr=[[]]\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_group_by_needs_projection() {
let sql = "SELECT COUNT(state), state FROM person GROUP BY state";
let expected = "\
Projection: #COUNT(state), #state\
\n Aggregate: groupBy=[[#state]], aggr=[[COUNT(#state)]]\
\n TableScan: person projection=None";
quick_test(sql, expected);
}
#[test]
fn select_7480_1() {
let sql = "SELECT c1, MIN(c12) FROM aggregate_test_100 GROUP BY c1, c13";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"General(\"Projection references non-aggregate values\")",
format!("{:?}", err)
);
}
#[test]
fn select_7480_2() {
let sql = "SELECT c1, c13, MIN(c12) FROM aggregate_test_100 GROUP BY c1";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"General(\"Projection references non-aggregate values\")",
format!("{:?}", err)
);
}
#[test]
fn create_external_table_csv() {
let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv'";
let expected = "CreateExternalTable: \"t\"";
quick_test(sql, expected);
}
#[test]
fn create_external_table_csv_no_schema() {
let sql = "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo.csv'";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"General(\"Column definitions required for CSV files. None found\")",
format!("{:?}", err)
);
}
#[test]
fn create_external_table_parquet() {
let sql =
"CREATE EXTERNAL TABLE t(c1 int) STORED AS PARQUET LOCATION 'foo.parquet'";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"General(\"Column definitions can not be specified for PARQUET files.\")",
format!("{:?}", err)
);
}
#[test]
fn create_external_table_parquet_no_schema() {
let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet'";
let expected = "CreateExternalTable: \"t\"";
quick_test(sql, expected);
}
fn logical_plan(sql: &str) -> Result<LogicalPlan> {
let planner = SqlToRel::new(&MockSchemaProvider {});
let ast = DFParser::parse_sql(&sql).unwrap();
planner.statement_to_plan(&ast[0])
}
fn quick_test(sql: &str, expected: &str) {
let plan = logical_plan(sql).unwrap();
assert_eq!(expected, format!("{:?}", plan));
}
struct MockSchemaProvider {}
impl SchemaProvider for MockSchemaProvider {
fn get_table_meta(&self, name: &str) -> Option<SchemaRef> {
match name {
"person" => Some(Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt32, false),
Field::new("first_name", DataType::Utf8, false),
Field::new("last_name", DataType::Utf8, false),
Field::new("age", DataType::Int32, false),
Field::new("state", DataType::Utf8, false),
Field::new("salary", DataType::Float64, false),
Field::new(
"birth_date",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
]))),
"aggregate_test_100" => Some(Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]))),
_ => None,
}
}
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
let f: ScalarFunctionImplementation =
Arc::new(|_| Err(ExecutionError::NotImplemented("".to_string())));
match name {
"my_sqrt" => Some(Arc::new(create_udf(
"my_sqrt",
vec![DataType::Float64],
Arc::new(DataType::Float64),
f,
))),
_ => None,
}
}
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
unimplemented!()
}
}
}