use std::clone::Clone;
use std::collections::HashMap;
use std::convert::*;
use std::fs::File;
use std::io::prelude::*;
use std::io::{BufWriter, Error};
use std::iter::Iterator;
use std::rc::Rc;
use std::str;
use std::string::String;
use arrow::array::*;
use arrow::datatypes::*;
use super::api::*;
use super::datasource::csv::CsvRelation;
use super::parser::*;
use super::rel::*;
use super::sql::ASTNode::*;
use super::sqltorel::*;
use super::dataframe::*;
use super::functions::geospatial::*;
use super::functions::math::*;
#[derive(Debug, Clone)]
pub enum DFConfig {
Local { data_dir: String },
Remote { etcd: String },
}
#[derive(Debug)]
pub enum ExecutionError {
IoError(Error),
ParserError(ParserError),
Custom(String),
}
impl From<Error> for ExecutionError {
fn from(e: Error) -> Self {
ExecutionError::IoError(e)
}
}
impl From<String> for ExecutionError {
fn from(e: String) -> Self {
ExecutionError::Custom(e)
}
}
impl From<ParserError> for ExecutionError {
fn from(e: ParserError) -> Self {
ExecutionError::ParserError(e)
}
}
pub trait Batch {
fn col_count(&self) -> usize;
fn row_count(&self) -> usize;
fn column(&self, index: usize) -> &Rc<Value>;
fn row_slice(&self, index: usize) -> Vec<ScalarValue>;
}
pub struct ColumnBatch {
pub row_count: usize,
pub columns: Vec<Rc<Value>>,
}
impl Batch for ColumnBatch {
fn col_count(&self) -> usize {
self.columns.len()
}
fn row_count(&self) -> usize {
self.row_count
}
fn column(&self, index: usize) -> &Rc<Value> {
&self.columns[index]
}
fn row_slice(&self, index: usize) -> Vec<ScalarValue> {
self.columns
.iter()
.map(|c| match c.as_ref() {
&Value::Scalar(_, ref v) => v.clone(),
&Value::Column(_, ref v) => get_value(v, index),
})
.collect()
}
}
pub enum Value {
Column(Rc<Field>, Rc<Array>),
Scalar(Rc<Field>, ScalarValue),
}
macro_rules! compare_arrays_inner {
($V1:ident, $V2:ident, $F:expr) => {
match ($V1.data(), $V2.data()) {
(&ArrayData::Float32(ref a), &ArrayData::Float32(ref b)) =>
Ok(a.iter().zip(b.iter()).map($F).collect::<Vec<bool>>()),
(&ArrayData::Float64(ref a), &ArrayData::Float64(ref b)) =>
Ok(a.iter().zip(b.iter()).map($F).collect::<Vec<bool>>()),
(&ArrayData::Int8(ref a), &ArrayData::Int8(ref b)) =>
Ok(a.iter().zip(b.iter()).map($F).collect::<Vec<bool>>()),
(&ArrayData::Int16(ref a), &ArrayData::Int16(ref b)) =>
Ok(a.iter().zip(b.iter()).map($F).collect::<Vec<bool>>()),
(&ArrayData::Int32(ref a), &ArrayData::Int32(ref b)) =>
Ok(a.iter().zip(b.iter()).map($F).collect::<Vec<bool>>()),
(&ArrayData::Int64(ref a), &ArrayData::Int64(ref b)) =>
Ok(a.iter().zip(b.iter()).map($F).collect::<Vec<bool>>()),
_ => Err(ExecutionError::Custom("Unsupported types in compare_arrays_inner".to_string()))
}
}
}
macro_rules! compare_arrays {
($NAME:expr, $F1:ident, $V1:ident, $F2:ident, $V2:ident, $F:expr) => {
Ok(Rc::new(Value::Column(
Rc::new(Field::new($NAME, $F1.data_type.clone(), false)),
Rc::new(Array::from(compare_arrays_inner!($V1, $V2, $F)?)),
)))
};
}
macro_rules! compare_array_with_scalar_inner {
($V1:ident, $V2:ident, $F:expr) => {
match ($V1.data(), $V2) {
(&ArrayData::Float32(ref a), &ScalarValue::Float32(b)) => {
Ok(a.iter().map(|aa| (aa, b)).map($F).collect::<Vec<bool>>())
}
(&ArrayData::Float64(ref a), &ScalarValue::Float64(b)) => {
Ok(a.iter().map(|aa| (aa, b)).map($F).collect::<Vec<bool>>())
}
_ => Err(ExecutionError::Custom(
"Unsupported types in compare_array_with_scalar_inner".to_string(),
)),
}
};
}
macro_rules! compare_array_with_scalar {
($NAME:expr, $F1:ident, $V1:ident, $F2:ident, $V2:ident, $F:expr) => {
Ok(Rc::new(Value::Column(
Rc::new(Field::new($NAME, $F1.data_type.clone(), false)),
Rc::new(Array::from(compare_array_with_scalar_inner!($V1, $V2, $F)?)),
)))
};
}
impl Value {
pub fn eq(&self, other: &Value) -> Result<Rc<Value>, ExecutionError> {
match (self, other) {
(&Value::Column(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_arrays!("==", f1, v1, _f2, v2, |(aa, bb)| aa == bb)
}
(&Value::Column(ref f1, ref v1), &Value::Scalar(ref _f2, ref v2)) => {
compare_array_with_scalar!("==", f1, v1, _f2, v2, |(aa, bb)| aa == bb)
}
(&Value::Scalar(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_array_with_scalar!("==", f1, v2, _f2, v1, |(aa, bb)| aa == bb)
}
(&Value::Scalar(ref _f1, ref _v1), &Value::Scalar(ref _f2, ref _v2)) => {
unimplemented!()
}
}
}
pub fn not_eq(&self, other: &Value) -> Result<Rc<Value>, ExecutionError> {
match (self, other) {
(&Value::Column(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_arrays!("!=", f1, v1, _f2, v2, |(aa, bb)| aa != bb)
}
(&Value::Column(ref f1, ref v1), &Value::Scalar(ref _f2, ref v2)) => {
compare_array_with_scalar!("!=", f1, v1, _f2, v2, |(aa, bb)| aa != bb)
}
(&Value::Scalar(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_array_with_scalar!("==", f1, v2, _f2, v1, |(aa, bb)| aa != bb)
}
(&Value::Scalar(ref _f1, ref _v1), &Value::Scalar(ref _f2, ref _v2)) => {
unimplemented!()
}
}
}
pub fn lt(&self, other: &Value) -> Result<Rc<Value>, ExecutionError> {
match (self, other) {
(&Value::Column(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_arrays!("<", f1, v1, _f2, v2, |(aa, bb)| aa < bb)
}
(&Value::Column(ref f1, ref v1), &Value::Scalar(ref _f2, ref v2)) => {
compare_array_with_scalar!("<", f1, v1, _f2, v2, |(aa, bb)| aa < bb)
}
(&Value::Scalar(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_array_with_scalar!("==", f1, v2, _f2, v1, |(aa, bb)| aa < bb)
}
(&Value::Scalar(ref _f1, ref _v1), &Value::Scalar(ref _f2, ref _v2)) => {
unimplemented!()
}
}
}
pub fn lt_eq(&self, other: &Value) -> Result<Rc<Value>, ExecutionError> {
match (self, other) {
(&Value::Column(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_arrays!("<", f1, v1, _f2, v2, |(aa, bb)| aa <= bb)
}
(&Value::Column(ref f1, ref v1), &Value::Scalar(ref _f2, ref v2)) => {
compare_array_with_scalar!("<", f1, v1, _f2, v2, |(aa, bb)| aa <= bb)
}
(&Value::Scalar(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_array_with_scalar!("==", f1, v2, _f2, v1, |(aa, bb)| aa <= bb)
}
(&Value::Scalar(ref _f1, ref _v1), &Value::Scalar(ref _f2, ref _v2)) => {
unimplemented!()
}
}
}
pub fn gt(&self, other: &Value) -> Result<Rc<Value>, ExecutionError> {
match (self, other) {
(&Value::Column(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_arrays!(">", f1, v1, _f2, v2, |(aa, bb)| aa >= bb)
}
(&Value::Column(ref f1, ref v1), &Value::Scalar(ref _f2, ref v2)) => {
compare_array_with_scalar!(">", f1, v1, _f2, v2, |(aa, bb)| aa >= bb)
}
(&Value::Scalar(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_array_with_scalar!("==", f1, v2, _f2, v1, |(aa, bb)| aa >= bb)
}
(&Value::Scalar(ref _f1, ref _v1), &Value::Scalar(ref _f2, ref _v2)) => {
unimplemented!()
}
}
}
pub fn gt_eq(&self, other: &Value) -> Result<Rc<Value>, ExecutionError> {
match (self, other) {
(&Value::Column(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_arrays!(">", f1, v1, _f2, v2, |(aa, bb)| aa > bb)
}
(&Value::Column(ref f1, ref v1), &Value::Scalar(ref _f2, ref v2)) => {
compare_array_with_scalar!(">", f1, v1, _f2, v2, |(aa, bb)| aa > bb)
}
(&Value::Scalar(ref f1, ref v1), &Value::Column(ref _f2, ref v2)) => {
compare_array_with_scalar!("==", f1, v2, _f2, v1, |(aa, bb)| aa > bb)
}
(&Value::Scalar(ref _f1, ref _v1), &Value::Scalar(ref _f2, ref _v2)) => {
unimplemented!()
}
}
}
pub fn add(&self, _other: &Value) -> Result<Rc<Value>, ExecutionError> {
unimplemented!()
}
pub fn subtract(&self, _other: &Value) -> Result<Rc<Value>, ExecutionError> {
unimplemented!()
}
pub fn divide(&self, _other: &Value) -> Result<Rc<Value>, ExecutionError> {
unimplemented!()
}
pub fn multiply(&self, _other: &Value) -> Result<Rc<Value>, ExecutionError> {
unimplemented!()
}
}
pub type CompiledExpr = Box<Fn(&Batch) -> Result<Rc<Value>, ExecutionError>>;
pub fn compile_expr(ctx: &ExecutionContext, expr: &Expr) -> Result<CompiledExpr, ExecutionError> {
match expr {
&Expr::Literal(ref lit) => {
let literal_value = lit.clone();
Ok(Box::new(move |_| {
Ok(Rc::new(Value::Scalar(
Rc::new(Field::new("lit", DataType::Int64, false)), literal_value.clone(),
)))
}))
}
&Expr::Column(index) => Ok(Box::new(move |batch: &Batch| {
Ok((*batch.column(index)).clone())
})),
&Expr::BinaryExpr {
ref left,
ref op,
ref right,
} => {
let left_expr = compile_expr(ctx, left)?;
let right_expr = compile_expr(ctx, right)?;
match op {
&Operator::Eq => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch)?;
let right_values = right_expr(batch)?;
left_values.eq(&right_values)
})),
&Operator::NotEq => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch)?;
let right_values = right_expr(batch)?;
left_values.not_eq(&right_values)
})),
&Operator::Lt => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch)?;
let right_values = right_expr(batch)?;
left_values.lt(&right_values)
})),
&Operator::LtEq => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch)?;
let right_values = right_expr(batch)?;
left_values.lt_eq(&right_values)
})),
&Operator::Gt => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch)?;
let right_values = right_expr(batch)?;
left_values.gt(&right_values)
})),
&Operator::GtEq => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch)?;
let right_values = right_expr(batch)?;
left_values.gt_eq(&right_values)
})),
&Operator::Plus => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch)?;
let right_values = right_expr(batch)?;
left_values.add(&right_values)
})),
&Operator::Minus => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch)?;
let right_values = right_expr(batch)?;
left_values.subtract(&right_values)
})),
&Operator::Divide => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch)?;
let right_values = right_expr(batch)?;
left_values.divide(&right_values)
})),
&Operator::Multiply => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch)?;
let right_values = right_expr(batch)?;
left_values.multiply(&right_values)
})),
_ => {
return Err(ExecutionError::Custom(format!(
"Unsupported binary operator '{:?}'",
op
)))
}
}
}
&Expr::Sort { ref expr, .. } => {
compile_expr(ctx, expr)
}
&Expr::ScalarFunction { ref name, ref args } => {
let compiled_args: Result<Vec<CompiledExpr>, ExecutionError> =
args.iter().map(|e| compile_expr(ctx, e)).collect();
let compiled_args_ok = compiled_args?;
let func = ctx.load_function_impl(name.as_ref())?;
Ok(Box::new(move |batch| {
let arg_values: Result<Vec<Rc<Value>>, ExecutionError> =
compiled_args_ok.iter()
.map(|expr| expr(batch))
.collect();
func.execute(arg_values?)
}))
} }
}
pub struct FilterRelation {
schema: Schema,
input: Box<SimpleRelation>,
expr: CompiledExpr,
}
pub struct ProjectRelation {
schema: Schema,
input: Box<SimpleRelation>,
expr: Vec<CompiledExpr>,
}
pub struct LimitRelation {
schema: Schema,
input: Box<SimpleRelation>,
limit: usize,
}
pub trait SimpleRelation {
fn scan<'a>(
&'a mut self,
ctx: &'a ExecutionContext,
) -> Box<Iterator<Item = Result<Box<Batch>, ExecutionError>> + 'a>;
fn schema<'a>(&'a self) -> &'a Schema;
}
impl SimpleRelation for FilterRelation {
fn scan<'a>(
&'a mut self,
ctx: &'a ExecutionContext,
) -> Box<Iterator<Item = Result<Box<Batch>, ExecutionError>> + 'a> {
let filter_expr = &self.expr;
Box::new(self.input.scan(ctx).map(move |b| {
match b {
Ok(ref batch) => {
let x = (*filter_expr)(batch.as_ref())?;
match x.as_ref() {
&Value::Column(_, ref filter_eval) => {
let filtered_columns: Vec<Rc<Value>> = (0..batch.col_count())
.map(move |column_index| {
let column = batch.column(column_index);
Rc::new(Value::Column(
Rc::new(Field::new("filter", DataType::Int64, false)), Rc::new(filter(column, &filter_eval)),
))
})
.collect();
let row_count_opt: Option<usize> = filtered_columns
.iter()
.map(|c| match c.as_ref() {
&Value::Scalar(_, _) => 1,
&Value::Column(_, ref v) => v.len(),
})
.max();
let row_count = match row_count_opt {
None => 0,
Some(n) => n,
};
let filtered_batch: Box<Batch> = Box::new(ColumnBatch {
row_count,
columns: filtered_columns,
});
Ok(filtered_batch)
}
_ => panic!(),
}
}
_ => panic!(),
}
}))
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
impl SimpleRelation for ProjectRelation {
fn scan<'a>(
&'a mut self,
ctx: &'a ExecutionContext,
) -> Box<Iterator<Item = Result<Box<Batch>, ExecutionError>> + 'a> {
let project_expr = &self.expr;
let projection_iter = self.input.scan(ctx).map(move |r| match r {
Ok(ref batch) => {
let projected_columns: Result<Vec<Rc<Value>>, ExecutionError> =
project_expr.iter().map(|e| (*e)(batch.as_ref())).collect();
let projected_batch: Box<Batch> = Box::new(ColumnBatch {
row_count: batch.row_count(),
columns: projected_columns?.clone(),
});
Ok(projected_batch)
}
Err(_) => r,
});
Box::new(projection_iter)
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
impl SimpleRelation for LimitRelation {
fn scan<'a>(
&'a mut self,
ctx: &'a ExecutionContext,
) -> Box<Iterator<Item = Result<Box<Batch>, ExecutionError>> + 'a> {
Box::new(self.input.scan(ctx).take(self.limit))
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
#[derive(Debug, Clone)]
pub enum PhysicalPlan {
Interactive { plan: Box<LogicalPlan> },
Write {
plan: Box<LogicalPlan>,
filename: String,
},
}
#[derive(Debug, Clone)]
pub enum ExecutionResult {
Unit,
Count(usize),
}
#[derive(Debug, Clone)]
pub struct ExecutionContext {
schemas: HashMap<String, Schema>,
functions: HashMap<String, FunctionMeta>,
config: DFConfig,
}
impl ExecutionContext {
pub fn local(data_dir: String) -> Self {
ExecutionContext {
schemas: HashMap::new(),
functions: HashMap::new(),
config: DFConfig::Local { data_dir },
}
}
pub fn remote(etcd: String) -> Self {
ExecutionContext {
schemas: HashMap::new(),
functions: HashMap::new(),
config: DFConfig::Remote { etcd: etcd },
}
}
pub fn define_schema(&mut self, name: &str, schema: &Schema) {
self.schemas.insert(name.to_string(), schema.clone());
}
pub fn define_function(&mut self, func: &ScalarFunction) {
let fm = FunctionMeta {
name: func.name(),
args: func.args(),
return_type: func.return_type(),
};
self.functions.insert(fm.name.to_lowercase(), fm);
}
pub fn create_logical_plan(&self, sql: &str) -> Result<Box<LogicalPlan>, ExecutionError> {
let ast = Parser::parse_sql(String::from(sql))?;
let query_planner = SqlToRel::new(self.schemas.clone());
Ok(query_planner.sql_to_rel(&ast)?)
}
pub fn sql(&mut self, sql: &str) -> Result<Box<DataFrame>, ExecutionError> {
let ast = Parser::parse_sql(String::from(sql))?;
match ast {
SQLCreateTable { name, columns } => {
let fields: Vec<Field> = columns
.iter()
.map(|c| Field::new(&c.name, convert_data_type(&c.data_type), c.allow_null))
.collect();
let schema = Schema::new(fields);
self.define_schema(&name, &schema);
Ok(Box::new(DF {
plan: Box::new(LogicalPlan::EmptyRelation),
}))
}
_ => {
let query_planner = SqlToRel::new(self.schemas.clone());
let plan = query_planner.sql_to_rel(&ast)?;
Ok(Box::new(DF { plan: plan }))
}
}
}
pub fn load(&self, filename: &str, schema: &Schema) -> Result<Box<DataFrame>, ExecutionError> {
let plan = LogicalPlan::CsvFile {
filename: filename.to_string(),
schema: schema.clone(),
};
Ok(Box::new(DF {
plan: Box::new(plan),
}))
}
pub fn register_table(&mut self, name: String, schema: Schema) {
self.schemas.insert(name, schema);
}
pub fn create_execution_plan(
&self,
data_dir: String,
plan: &LogicalPlan,
) -> Result<Box<SimpleRelation>, ExecutionError> {
match *plan {
LogicalPlan::EmptyRelation => Err(ExecutionError::Custom(String::from(
"empty relation is not implemented yet",
))),
LogicalPlan::TableScan {
ref table_name,
ref schema,
..
} => {
let filename = format!("{}/{}.csv", data_dir, table_name);
let file = File::open(filename)?;
let rel = CsvRelation::open(file, schema.clone())?;
Ok(Box::new(rel))
}
LogicalPlan::CsvFile {
ref filename,
ref schema,
} => {
let file = File::open(filename)?;
let rel = CsvRelation::open(file, schema.clone())?;
Ok(Box::new(rel))
}
LogicalPlan::Selection {
ref expr,
ref input,
ref schema,
} => {
let input_rel = self.create_execution_plan(data_dir, input)?;
let rel = FilterRelation {
input: input_rel,
expr: compile_expr(&self, expr)?,
schema: schema.clone(),
};
Ok(Box::new(rel))
}
LogicalPlan::Projection {
ref expr,
ref input,
..
} => {
let input_rel = self.create_execution_plan(data_dir, &input)?;
let input_schema = input_rel.schema().clone();
let project_columns: Vec<Field> = expr.iter()
.map(|e| {
match e {
&Expr::Column(i) => input_schema.columns[i].clone(),
&Expr::ScalarFunction { ref name, .. } => Field {
name: name.clone(),
data_type: DataType::Float64, nullable: true,
},
_ => unimplemented!("Unsupported projection expression"),
}
})
.collect();
let project_schema = Schema {
columns: project_columns,
};
let compiled_expr: Result<Vec<CompiledExpr>, ExecutionError> =
expr.iter().map(|e| compile_expr(&self, e)).collect();
let rel = ProjectRelation {
input: input_rel,
expr: compiled_expr?,
schema: project_schema,
};
Ok(Box::new(rel))
}
LogicalPlan::Limit {
limit,
ref input,
ref schema,
..
} => {
let input_rel = self.create_execution_plan(data_dir, input)?;
let rel = LimitRelation {
input: input_rel,
limit: limit,
schema: schema.clone(),
};
Ok(Box::new(rel))
}
_ => unimplemented!(),
}
}
fn load_function_impl(
&self,
function_name: &str,
) -> Result<Box<ScalarFunction>, ExecutionError> {
match function_name.to_lowercase().as_ref() {
"sqrt" => Ok(Box::new(SqrtFunction {})),
"st_point" => Ok(Box::new(STPointFunc {})),
"st_astext" => Ok(Box::new(STAsText {})),
_ => Err(ExecutionError::Custom(format!(
"Unknown function {}",
function_name
))),
}
}
pub fn udf(&self, name: &str, args: Vec<Expr>) -> Expr {
Expr::ScalarFunction {
name: name.to_string(),
args: args.clone(),
}
}
pub fn write(&self, df: Box<DataFrame>, filename: &str) -> Result<usize, DataFrameError> {
let physical_plan = PhysicalPlan::Write {
plan: df.plan().clone(),
filename: filename.to_string(),
};
match self.execute(&physical_plan)? {
ExecutionResult::Count(count) => Ok(count),
_ => Err(DataFrameError::NotImplemented), }
}
pub fn execute(&self, physical_plan: &PhysicalPlan) -> Result<ExecutionResult, ExecutionError> {
match &self.config {
&DFConfig::Local { ref data_dir } => {
self.execute_local(physical_plan, data_dir.clone())
}
&DFConfig::Remote { ref etcd } => self.execute_remote(physical_plan, etcd.clone()),
}
}
fn execute_local(
&self,
physical_plan: &PhysicalPlan,
data_dir: String,
) -> Result<ExecutionResult, ExecutionError> {
match physical_plan {
&PhysicalPlan::Interactive { .. } => {
Err(ExecutionError::Custom(format!("not implemented")))
}
&PhysicalPlan::Write {
ref plan,
ref filename,
} => {
let file = File::create(filename)?;
let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file);
let mut execution_plan = self.create_execution_plan(data_dir, plan)?;
let it = execution_plan.scan(self);
let mut count: usize = 0;
it.for_each(|t| {
match t {
Ok(ref batch) => {
for i in 0..batch.row_count() {
let row = batch.row_slice(i);
let csv = row.into_iter()
.map(|v| v.to_string())
.collect::<Vec<String>>()
.join(",");
writer.write(&csv.into_bytes()).unwrap();
writer.write(b"\n").unwrap();
count += 1;
}
}
Err(e) => panic!(format!("Error processing row: {:?}", e)), }
});
Ok(ExecutionResult::Count(count))
}
}
}
fn execute_remote(
&self,
_physical_plan: &PhysicalPlan,
_etcd: String,
) -> Result<ExecutionResult, ExecutionError> {
Err(ExecutionError::Custom(format!(
"Remote execution needs re-implementing since moving to Arrow"
)))
}
}
pub struct DF {
pub plan: Box<LogicalPlan>,
}
impl DataFrame for DF {
fn select(&self, expr: Vec<Expr>) -> Result<Box<DataFrame>, DataFrameError> {
let plan = LogicalPlan::Projection {
expr: expr,
input: self.plan.clone(),
schema: self.plan.schema().clone(),
};
Ok(Box::new(DF {
plan: Box::new(plan),
}))
}
fn sort(&self, expr: Vec<Expr>) -> Result<Box<DataFrame>, DataFrameError> {
let plan = LogicalPlan::Sort {
expr: expr,
input: self.plan.clone(),
schema: self.plan.schema().clone(),
};
Ok(Box::new(DF {
plan: Box::new(plan),
}))
}
fn filter(&self, expr: Expr) -> Result<Box<DataFrame>, DataFrameError> {
let plan = LogicalPlan::Selection {
expr: expr,
input: self.plan.clone(),
schema: self.plan.schema().clone(),
};
Ok(Box::new(DF {
plan: Box::new(plan),
}))
}
fn col(&self, column_name: &str) -> Result<Expr, DataFrameError> {
match self.plan.schema().column(column_name) {
Some((i, _)) => Ok(Expr::Column(i)),
_ => Err(DataFrameError::InvalidColumn(column_name.to_string())),
}
}
fn schema(&self) -> Schema {
self.plan.schema().clone()
}
fn repartition(&self, _n: u32) -> Result<Box<DataFrame>, DataFrameError> {
unimplemented!()
}
fn plan(&self) -> Box<LogicalPlan> {
self.plan.clone()
}
}
pub fn get_value(column: &Array, index: usize) -> ScalarValue {
let v = match column.data() {
&ArrayData::Boolean(ref v) => ScalarValue::Boolean(*v.get(index)),
&ArrayData::Float32(ref v) => ScalarValue::Float32(*v.get(index)),
&ArrayData::Float64(ref v) => ScalarValue::Float64(*v.get(index)),
&ArrayData::Int8(ref v) => ScalarValue::Int8(*v.get(index)),
&ArrayData::Int16(ref v) => ScalarValue::Int16(*v.get(index)),
&ArrayData::Int32(ref v) => ScalarValue::Int32(*v.get(index)),
&ArrayData::Int64(ref v) => ScalarValue::Int64(*v.get(index)),
&ArrayData::UInt8(ref v) => ScalarValue::UInt8(*v.get(index)),
&ArrayData::UInt16(ref v) => ScalarValue::UInt16(*v.get(index)),
&ArrayData::UInt32(ref v) => ScalarValue::UInt32(*v.get(index)),
&ArrayData::UInt64(ref v) => ScalarValue::UInt64(*v.get(index)),
&ArrayData::Utf8(ref data) => {
ScalarValue::Utf8(String::from(str::from_utf8(data.slice(index)).unwrap()))
}
&ArrayData::Struct(ref v) => {
let fields = v.iter().map(|arr| get_value(&arr, index)).collect();
ScalarValue::Struct(fields)
}
};
v
}
pub fn filter(column: &Rc<Value>, bools: &Array) -> Array {
match column.as_ref() {
&Value::Scalar(_, _) => unimplemented!(),
&Value::Column(_, ref arr) => match bools.data() {
&ArrayData::Boolean(ref b) => match arr.as_ref().data() {
&ArrayData::Boolean(ref v) => Array::from(
v.iter()
.zip(b.iter())
.filter(|&(_, f)| f)
.map(|(v, _)| v)
.collect::<Vec<bool>>(),
),
&ArrayData::Float32(ref v) => Array::from(
v.iter()
.zip(b.iter())
.filter(|&(_, f)| f)
.map(|(v, _)| v)
.collect::<Vec<f32>>(),
),
&ArrayData::Float64(ref v) => Array::from(
v.iter()
.zip(b.iter())
.filter(|&(_, f)| f)
.map(|(v, _)| v)
.collect::<Vec<f64>>(),
),
&ArrayData::UInt16(ref v) => Array::from(
v.iter()
.zip(b.iter())
.filter(|&(_, f)| f)
.map(|(v, _)| v)
.collect::<Vec<u16>>(),
),
&ArrayData::UInt32(ref v) => Array::from(
v.iter()
.zip(b.iter())
.filter(|&(_, f)| f)
.map(|(v, _)| v)
.collect::<Vec<u32>>(),
),
&ArrayData::UInt64(ref v) => Array::from(
v.iter()
.zip(b.iter())
.filter(|&(_, f)| f)
.map(|(v, _)| v)
.collect::<Vec<u64>>(),
),
&ArrayData::Int8(ref v) => Array::from(
v.iter()
.zip(b.iter())
.filter(|&(_, f)| f)
.map(|(v, _)| v)
.collect::<Vec<i8>>(),
),
&ArrayData::Int16(ref v) => Array::from(
v.iter()
.zip(b.iter())
.filter(|&(_, f)| f)
.map(|(v, _)| v)
.collect::<Vec<i16>>(),
),
&ArrayData::Int32(ref v) => Array::from(
v.iter()
.zip(b.iter())
.filter(|&(_, f)| f)
.map(|(v, _)| v)
.collect::<Vec<i32>>(),
),
&ArrayData::Int64(ref v) => Array::from(
v.iter()
.zip(b.iter())
.filter(|&(_, f)| f)
.map(|(v, _)| v)
.collect::<Vec<i64>>(),
),
&ArrayData::Utf8(ref v) => {
let mut x: Vec<String> = Vec::with_capacity(b.len() as usize);
for i in 0..b.len() as usize {
if *b.get(i) {
x.push(String::from_utf8(v.slice(i as usize).to_vec()).unwrap());
}
}
Array::from(x)
}
_ => unimplemented!(),
},
_ => panic!(),
},
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sqrt() {
let mut ctx = create_context();
ctx.define_function(&SqrtFunction {});
let df = ctx.sql(&"SELECT id, sqrt(id) FROM people").unwrap();
ctx.write(df, "_sqrt_out.csv").unwrap();
}
#[test]
fn test_sql_udf_udt() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let df = ctx.sql(&"SELECT ST_Point(lat, lng) FROM uk_cities")
.unwrap();
ctx.write(df, "_uk_cities_sql.csv").unwrap();
}
#[test]
fn test_df_udf_udt() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let schema = Schema::new(vec![
Field::new("city", DataType::Utf8, false),
Field::new("lat", DataType::Float64, false),
Field::new("lng", DataType::Float64, false),
]);
let df = ctx.load("test/data/uk_cities.csv", &schema).unwrap();
let func_expr = ctx.udf(
"ST_Point",
vec![df.col("lat").unwrap(), df.col("lng").unwrap()],
);
let df2 = df.select(vec![func_expr]).unwrap();
ctx.write(df2, "_uk_cities_df.csv").unwrap();
}
#[test]
fn test_filter() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let schema = Schema::new(vec![
Field::new("city", DataType::Utf8, false),
Field::new("lat", DataType::Float64, false),
Field::new("lng", DataType::Float64, false),
]);
let df = ctx.load("test/data/uk_cities.csv", &schema).unwrap();
let df2 = df.filter(Expr::BinaryExpr {
left: Box::new(Expr::Column(1)), op: Operator::Gt,
right: Box::new(Expr::Literal(ScalarValue::Float64(52.0))),
}).unwrap();
ctx.write(df2, "_uk_cities_filtered_gt_52.csv").unwrap();
}
#[test]
fn test_chaining_functions() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let df = ctx.sql(&"SELECT ST_AsText(ST_Point(lat, lng)) FROM uk_cities")
.unwrap();
ctx.write(df, "_uk_cities_wkt.csv").unwrap();
}
fn create_context() -> ExecutionContext {
let mut ctx = ExecutionContext::local("./test/data".to_string());
ctx.define_schema(
"people",
&Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]),
);
ctx.define_schema(
"uk_cities",
&Schema::new(vec![
Field::new("city", DataType::Utf8, false),
Field::new("lat", DataType::Float64, false),
Field::new("lng", DataType::Float64, false),
]),
);
ctx
}
#[test]
fn sql_query_example() {
let mut ctx = ExecutionContext::local("./test/data".to_string());
ctx.sql(
"CREATE EXTERNAL TABLE uk_cities (\
city VARCHAR(100), \
lat DOUBLE, \
lng DOUBLE)",
).unwrap();
let sql = "SELECT ST_AsText(ST_Point(lat, lng)) FROM uk_cities WHERE lat < 53.0";
let df1 = ctx.sql(&sql).unwrap();
ctx.write(df1, "_southern_cities.csv").unwrap();
}
}