pub mod error;
pub mod model;
mod expr;
use self::error::ParserError;
use base64::{engine::general_purpose, Engine as _};
use expr::*;
use sqlparser::ast::*;
use sqlparser::dialect::DuckDbDialect;
use sqlparser::parser;
use sqlparser::parser::Parser as SqlParser;
use std::result;
use std::{collections::HashMap, io::Read};
use self::model::{DataView, Sort, DSL};
pub type Result<T> = result::Result<T, ParserError>;
pub struct ParserParams {
pub quote_style: Option<char>,
pub as_quote_style: Option<char>,
}
pub struct Parser {
pub params: ParserParams,
}
pub fn parse_dsl(
dataset: &model::Dataset,
query: &str,
params: ParserParams,
) -> std::string::String {
let dsl: DSL = match serde_json::from_str(query) {
Ok(it) => it,
Err(err) => {
dbg!(err);
return "dsl json praser error".to_owned();
}
};
let parser = Parser { params };
let table: TableWithJoins = if dsl.dataview.is_empty() {
parser.parser_table_define(dataset)
} else {
let res = parser.parser_dataview(dataset, dsl.dataview.clone());
if res.is_err() {
return "dsp parser error".to_owned();
}
res.unwrap()
};
let ast = parser.parser_dsl_wtih_table(table, dsl).unwrap();
let res = ast.to_string();
if !check_dsl(res.to_string()) {
return "check sql error".to_owned();
}
res
}
pub fn check_dsl(sql: String) -> bool {
let res_ast = SqlParser::parse_sql(&DuckDbDialect {}, &sql);
if res_ast.is_err() {
return false;
}
let res_statements = res_ast.unwrap();
if res_statements.len() != 1 {
return false;
}
let check = matches!(res_statements[0], Statement::Query(_));
check
}
impl Parser {
pub fn parser_table_define(&self, dataset: &model::Dataset) -> TableWithJoins {
match dataset.r#type {
model::DatasetType::Table => TableWithJoins {
relation: TableFactor::Table {
name: ObjectName(vec![sqlparser::ast::Ident {
value: dataset.source.clone(),
quote_style: self.get_quote(),
}]),
alias: None,
args: None,
with_hints: vec![],
},
joins: vec![],
},
model::DatasetType::View => {
let sub_query =
parser::Parser::parse_sql(&DuckDbDialect, dataset.source.as_str()).unwrap();
if sub_query.len() != 1 {
panic!("sub query must be one");
}
if let sqlparser::ast::Statement::Query(query) = &sub_query[0] {
TableWithJoins {
relation: TableFactor::Derived {
lateral: false,
subquery: query.clone(),
alias: Some(sqlparser::ast::TableAlias {
name: sqlparser::ast::Ident {
value: "k_gw_temp_view".to_owned(),
quote_style: self.get_as_quote(),
},
columns: vec![],
}),
},
joins: vec![],
}
} else {
panic!("sub query must be one");
}
}
}
}
pub fn parser_dataview(
&self,
dataset: &model::Dataset,
view: Vec<DataView>,
) -> Result<TableWithJoins> {
if view.is_empty() || view.len() > 1 {
return Err(ParserError::DataViewCreateError(
"view must be one".to_string(),
));
}
if view[0].query.is_empty() || view[0].query.len() > 1 {
return Err(ParserError::DataViewCreateError(
"query must be one".to_string(),
));
}
let query = &view[0].query[0];
let origin_ast = parser::Parser::parse_sql(&DuckDbDialect, &query.sql).unwrap();
let check = matches!(&origin_ast[0], Statement::Query(_));
if !check {
return Err(ParserError::DataViewCreateError(
"query must be select".to_string(),
));
}
let fromtable = self.parser_table_define(dataset);
let view_project = self.create_data_view_projection(query.fid_pap.clone());
let select_body = Select {
distinct: None,
top: None,
projection: view_project,
into: None,
from: vec![fromtable],
lateral_views: vec![],
selection: None,
group_by: vec![],
cluster_by: vec![],
distribute_by: vec![],
sort_by: vec![],
having: None,
named_window: vec![],
qualify: None,
};
let rewrite_fid_view = TableWithJoins {
relation: TableFactor::Derived {
lateral: false,
subquery: Box::new(Query {
with: None,
order_by: vec![],
body: Box::new(SetExpr::Select(Box::new(select_body))),
limit: None,
offset: None,
fetch: None,
locks: vec![],
}),
alias: Some(sqlparser::ast::TableAlias {
name: sqlparser::ast::Ident {
value: "k_gw_write_view".to_owned(),
quote_style: self.get_as_quote(),
},
columns: vec![],
}),
},
joins: vec![],
};
let new_ast = self.replace_default_table(origin_ast[0].clone(), rewrite_fid_view);
if let sqlparser::ast::Statement::Query(query) = &new_ast {
Ok(TableWithJoins {
relation: TableFactor::Derived {
lateral: false,
subquery: query.clone(),
alias: Some(sqlparser::ast::TableAlias {
name: sqlparser::ast::Ident {
value: "k_gw_temp_view".to_owned(),
quote_style: self.get_as_quote(),
},
columns: vec![],
}),
},
joins: vec![],
})
} else {
panic!("sub query must be one");
}
}
pub fn replace_default_table(
&self,
origin: Statement,
review_table: TableWithJoins,
) -> Statement {
let new_ast: Statement = match origin {
Statement::Query(query) => {
let mut new_query = query.clone();
match *query.body {
SetExpr::Select(select) => {
if select.from.len() > 1 {
panic!("body ast must select one");
}
new_query.body = Box::new(SetExpr::Select(Box::new(Select {
from: vec![self.rewrite_data_view_table_with_join(
select.from[0].clone(),
review_table,
)],
..*select
})));
}
_ => panic!("body ast must select"),
};
Statement::Query(new_query)
}
_ => panic!("body ast must select"),
};
new_ast
}
pub fn rewrite_data_view_table_with_join(
&self,
origin_table: TableWithJoins,
review_table: TableWithJoins,
) -> TableWithJoins {
match origin_table.relation {
TableFactor::Table {
name: _,
alias: _,
args: _,
with_hints: _,
} => review_table,
TableFactor::Derived {
lateral: _,
subquery,
alias: _,
} => {
let new_sub_query = match *subquery.body {
SetExpr::Select(mut select) => {
if select.from.len() > 1 {
panic!("body ast must select one");
}
select.from[0] = self.rewrite_data_view_table_with_join(
select.from[0].clone(),
review_table,
);
Query {
body: Box::new(SetExpr::Select(select)),
..*subquery
}
}
_ => panic!("body ast must select"),
};
TableWithJoins {
relation: TableFactor::Derived {
lateral: false,
subquery: Box::new(new_sub_query),
alias: Some(sqlparser::ast::TableAlias {
name: sqlparser::ast::Ident {
value: "k_gw_review_default".to_owned(),
quote_style: self.get_as_quote(),
},
columns: vec![],
}),
},
joins: vec![],
}
}
_ => panic!("rewrite_data_view_table_with_join failed"),
}
}
pub fn create_data_view_projection(&self, fid_map: HashMap<String, String>) -> Projection {
let mut project_exprs = vec![];
let mut vec: Vec<_> = fid_map.iter().collect();
vec.sort_by(|&(a, _), &(b, _)| a.cmp(b));
for item in vec {
let expr = Expr::Identifier(Ident {
value: item.1.to_string(),
quote_style: self.get_quote(),
});
let alias = Ident {
value: item.0.to_string(),
quote_style: self.get_as_quote(),
};
project_exprs.push(SelectItem::ExprWithAlias { expr, alias });
}
project_exprs
}
pub fn parser_dsl_wtih_table(&self, table: TableWithJoins, dsl: DSL) -> Result<Statement> {
let mut alias: HashMap<String, SelectItem> = HashMap::new();
let mut project_exprs = vec![];
let mut group_epxrs = vec![];
let mut sort_exprs = vec![];
let mut sub_project_exprs = vec![];
let mut slection: Option<Expr> = None;
let mut slecction_exprs = vec![];
for ele in dsl.workflow {
match &ele.workflow_type {
model::WorkflowType::View => {
for query in &ele.query {
let read_onley_alias = alias.clone();
let (project, group, append_alias) =
self.get_projection_and_group_expr(query, &read_onley_alias);
project_exprs.extend(project);
if !group.is_empty() {
group_epxrs = group
}
for (k, v) in append_alias {
alias.insert(k.to_string(), v.clone());
}
}
}
model::WorkflowType::Filter => {
for filter in &ele.filters {
let read_onley_alias = alias.clone();
let select_one = self.get_selection(filter.clone(), &read_onley_alias);
slecction_exprs.push(select_one);
}
}
model::WorkflowType::Transform => {
for transform in &ele.transform {
let (sub_project, project, append_alias) =
self.get_transform_expr(transform.clone(), &alias);
for (k, v) in append_alias {
alias.insert(k.to_string(), v.clone());
}
if !project.is_empty() {
project_exprs.extend(project);
}
if !sub_project.is_empty() {
sub_project_exprs.extend(sub_project)
}
}
}
model::WorkflowType::Sort => sort_exprs = self.get_order_epxr(ele.sort, ele.by),
}
}
if !slecction_exprs.is_empty() {
slection = merge_binary_op(slecction_exprs);
}
let (limit, offset) = get_limit_offset(dsl.limit, dsl.offset);
let from: Vec<TableWithJoins>;
if !sub_project_exprs.is_empty() {
match table.relation {
TableFactor::Table {
name: _,
alias: _,
with_hints: _,
args: _,
} => {
sub_project_exprs.push(SelectItem::Wildcard(WildcardAdditionalOptions {
opt_exclude: None,
opt_except: None,
opt_rename: None,
opt_replace: None,
}));
from = vec![TableWithJoins {
relation: TableFactor::Derived {
lateral: false,
alias: Some(TableAlias {
name: Ident {
value: "kanaries_sub_query".to_string(),
quote_style: None,
},
columns: vec![],
}),
subquery: Box::new(Query {
with: None,
body: Box::new(SetExpr::Select(Box::new(Select {
distinct: None,
top: None,
projection: sub_project_exprs,
into: None,
from: vec![table],
lateral_views: vec![],
selection: None,
group_by: vec![],
cluster_by: vec![],
distribute_by: vec![],
sort_by: vec![],
having: None,
named_window: vec![],
qualify: None,
}))),
order_by: vec![],
limit: None,
offset: None,
fetch: None,
locks: vec![],
}),
},
joins: vec![],
}];
}
TableFactor::Derived {
lateral,
subquery,
alias,
} => {
let mut sub_query = subquery;
if let sqlparser::ast::SetExpr::Select(select) = &mut *sub_query.body {
select.projection = sub_project_exprs;
}
from = vec![TableWithJoins {
relation: TableFactor::Derived {
lateral,
subquery: sub_query,
alias,
},
joins: vec![],
}];
}
_ => todo!("not support"),
}
} else {
from = vec![table];
}
Ok(Statement::Query(Box::new(Query {
with: None,
body: Box::new(SetExpr::Select(Box::new(Select {
distinct: None,
top: None,
projection: project_exprs,
into: None,
from,
lateral_views: vec![],
selection: slection,
group_by: group_epxrs,
cluster_by: vec![],
distribute_by: vec![],
sort_by: vec![],
having: None,
named_window: vec![],
qualify: None,
}))),
order_by: sort_exprs,
limit,
offset,
fetch: None,
locks: vec![],
})))
}
pub fn get_quote(&self) -> Option<char> {
self.params.quote_style
}
pub fn get_as_quote(&self) -> Option<char> {
self.params.as_quote_style
}
pub fn get_projection_and_group_expr(
&self,
query: &model::Query,
alias: &Alias,
) -> (Projection, GroupBy, Alias) {
let mut new_alias = alias.clone();
match query.op {
model::QueryOperation::Aggregate => {
let mut projection = Projection::default();
let mut group_by: Vec<Expr> = GroupBy::default();
for ele in &query.group_by {
group_by.push(Expr::Identifier(Ident {
value: ele.clone(),
quote_style: self.get_quote(),
}));
if let Some(e) = alias.get(ele) {
projection.push(e.clone());
} else {
projection.push(SelectItem::UnnamedExpr(Expr::Identifier(Ident {
value: ele.clone(),
quote_style: self.get_quote(),
})));
}
}
for ele in &query.measures {
let mut name_epxr: SelectItem;
if ele.field == "*" {
name_epxr = SelectItem::Wildcard(WildcardAdditionalOptions {
opt_exclude: None,
opt_except: None,
opt_rename: None,
opt_replace: None,
});
} else if let Some(format) = &ele.format {
name_epxr = SelectItem::ExprWithAlias {
expr: get_format_time_expr(
Expr::Identifier(Ident {
value: ele.field.clone(),
quote_style: self.get_quote(),
}),
format.clone(),
),
alias: Ident {
value: ele.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
};
} else {
name_epxr = SelectItem::ExprWithAlias {
expr: Expr::Identifier(Ident {
value: ele.field.clone(),
quote_style: self.get_quote(),
}),
alias: Ident {
value: ele.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
};
}
if !new_alias.contains_key(&ele.field) {
new_alias.insert(ele.as_field_key.clone(), name_epxr.clone());
} else {
name_epxr = new_alias.get(&ele.field).unwrap().clone();
}
let expr = match ele.agg {
model::Agg::DistinctCount => Expr::Function(Function {
name: ObjectName(vec![Ident {
value: "count".to_string(),
quote_style: None,
}]),
args: vec![FunctionArg::Unnamed(get_expr_from_select(name_epxr))],
over: None,
distinct: true,
special: false,
order_by: vec![],
}),
_ => get_function_expr(
get_agg_func(ele.agg.clone()),
vec![FunctionArg::Unnamed(get_expr_from_select(name_epxr))],
),
};
projection.push(SelectItem::ExprWithAlias {
expr,
alias: Ident {
value: ele.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
});
}
(projection, group_by, new_alias)
}
model::QueryOperation::Raw => {
let mut projection = Projection::default();
let group_by: Vec<Expr> = GroupBy::default();
for field in &query.fields {
if let Some(f) = alias.get(field) {
projection.push(f.clone());
} else if field == "*" {
projection.push(SelectItem::Wildcard(WildcardAdditionalOptions {
opt_exclude: None,
opt_except: None,
opt_rename: None,
opt_replace: None,
}));
} else {
projection.push(SelectItem::UnnamedExpr(Expr::Identifier(Ident {
value: field.clone(),
quote_style: self.get_quote(),
})));
}
}
(projection, group_by, new_alias)
}
}
}
pub fn get_transform_expr(
&self,
transform: model::Transform,
alias: &Alias,
) -> (Projection, Projection, Alias) {
let mut sub_projection = Projection::default();
let mut append_alias = alias.clone();
match transform.expression.op {
model::TransformOp::One => {
let sleect_item =
SelectItem::UnnamedExpr(Expr::Value(Value::Number("1".to_string(), false)));
append_alias.insert(transform.expression.as_field_key, sleect_item);
(sub_projection, vec![], append_alias)
}
model::TransformOp::Bin => {
let num = transform.expression.num.unwrap_or(10);
let param = transform.expression.params.unwrap()[0].clone();
let value = match param.value {
model::ValueParam::String(ref v) => v.clone(),
_ => panic!("value error"),
};
let min_expr = SelectItem::ExprWithAlias {
expr: get_window_function_expr(
"min".to_string(),
vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
Expr::Identifier(Ident {
value: value.clone(),
quote_style: self.get_quote(),
}),
))],
WindowType::WindowSpec(WindowSpec {
partition_by: vec![],
order_by: vec![],
window_frame: None,
}),
),
alias: Ident {
value: format!("{}{}", "min_", transform.expression.as_field_key),
quote_style: self.get_as_quote(),
},
};
let max_expr = SelectItem::ExprWithAlias {
expr: get_window_function_expr(
"max".to_string(),
vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
Expr::Identifier(Ident {
value: value.clone(),
quote_style: self.get_quote(),
}),
))],
WindowType::WindowSpec(WindowSpec {
partition_by: vec![],
order_by: vec![],
window_frame: None,
}),
),
alias: Ident {
value: format!("{}{}", "max_", transform.expression.as_field_key),
quote_style: self.get_as_quote(),
},
};
sub_projection.push(min_expr);
sub_projection.push(max_expr);
let min_epxr_indentifier = Expr::Identifier(Ident {
value: format!("{}{}", "min_", transform.expression.as_field_key),
quote_style: self.get_quote(),
});
let max_epxr_indentifier = Expr::Identifier(Ident {
value: format!("{}{}", "max_", transform.expression.as_field_key),
quote_style: self.get_quote(),
});
let max_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
max_epxr_indentifier,
min_epxr_indentifier.clone(),
BinaryOperator::Minus,
)));
let div_10_epxr = Expr::Nested(Box::new(get_binary_op_epxr(
max_minus_min,
Expr::Value(Value::Number(num.to_string() + ".0", false)),
BinaryOperator::Divide,
)));
let col_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
Expr::Identifier(Ident {
value,
quote_style: self.get_quote(),
}),
min_epxr_indentifier.clone(),
BinaryOperator::Minus,
)));
let col_div = Expr::Nested(Box::new(get_binary_op_epxr(
col_minus_min,
div_10_epxr.clone(),
BinaryOperator::Divide,
)));
let floor_expr = get_function_expr(
"floor".to_string(),
vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(col_div))],
);
let least_expr = get_function_expr(
"least".to_string(),
vec![
FunctionArg::Unnamed(FunctionArgExpr::Expr(floor_expr)),
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(Value::Number(
(num - 1).to_string(),
false,
)))),
],
);
let final_expr = Expr::Nested(Box::new(get_binary_op_epxr(
least_expr,
div_10_epxr,
BinaryOperator::Multiply,
)));
let final_expr_with_alias = SelectItem::ExprWithAlias {
expr: Expr::Nested(Box::new(get_binary_op_epxr(
min_epxr_indentifier,
final_expr,
BinaryOperator::Plus,
))),
alias: Ident {
value: transform.expression.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
};
append_alias.insert(transform.expression.as_field_key, final_expr_with_alias);
(sub_projection, vec![], append_alias)
}
model::TransformOp::Log2 => {
let param = transform.expression.params.unwrap()[0].clone();
let value = match param.value {
model::ValueParam::String(ref v) => v.clone(),
_ => panic!("value error"),
};
let log_2_epxr = self.get_log_n_func_expr(2, value);
let log_2_expr_with_alias = SelectItem::ExprWithAlias {
expr: log_2_epxr,
alias: Ident {
value: transform.expression.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
};
append_alias.insert(transform.expression.as_field_key, log_2_expr_with_alias);
(sub_projection, vec![], append_alias)
}
model::TransformOp::Log10 => {
let param = transform.expression.params.unwrap()[0].clone();
let value = match param.value {
model::ValueParam::String(ref v) => v.clone(),
_ => panic!("value error"),
};
let log_10_epxr = self.get_log_n_func_expr(10, value);
let log_10_expr_with_alias = SelectItem::ExprWithAlias {
expr: log_10_epxr,
alias: Ident {
value: transform.expression.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
};
append_alias.insert(transform.expression.as_field_key, log_10_expr_with_alias);
(sub_projection, vec![], append_alias)
}
model::TransformOp::Log => {
let param = transform.expression.params.unwrap()[0].clone();
let value = match param.value {
model::ValueParam::String(ref v) => v.clone(),
_ => panic!("value error"),
};
let log_epxr =
self.get_log_n_func_expr(transform.expression.num.unwrap_or(10), value);
let log_expr_with_alias = SelectItem::ExprWithAlias {
expr: log_epxr,
alias: Ident {
value: transform.expression.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
};
append_alias.insert(transform.expression.as_field_key, log_expr_with_alias);
(sub_projection, vec![], append_alias)
}
model::TransformOp::BinCount => {
let num = transform.expression.num.unwrap_or(10);
let param = transform.expression.params.unwrap()[0].clone();
let value = match param.value {
model::ValueParam::String(ref v) => v.clone(),
_ => panic!("value error"),
};
let min_expr = SelectItem::ExprWithAlias {
expr: get_window_function_expr(
"min".to_string(),
vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
Expr::Identifier(Ident {
value: value.clone(),
quote_style: self.get_quote(),
}),
))],
WindowType::WindowSpec(WindowSpec {
partition_by: vec![],
order_by: vec![],
window_frame: None,
}),
),
alias: Ident {
value: format!("{}{}", "min_", transform.expression.as_field_key),
quote_style: self.get_as_quote(),
},
};
let max_expr = SelectItem::ExprWithAlias {
expr: get_window_function_expr(
"max".to_string(),
vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
Expr::Identifier(Ident {
value: value.clone(),
quote_style: self.get_quote(),
}),
))],
WindowType::WindowSpec(WindowSpec {
partition_by: vec![],
order_by: vec![],
window_frame: None,
}),
),
alias: Ident {
value: format!("{}{}", "max_", transform.expression.as_field_key),
quote_style: self.get_as_quote(),
},
};
sub_projection.push(min_expr);
sub_projection.push(max_expr);
let min_epxr_indentifier = Expr::Identifier(Ident {
value: format!("{}{}", "min_", transform.expression.as_field_key),
quote_style: self.get_quote(),
});
let max_epxr_indentifier = Expr::Identifier(Ident {
value: format!("{}{}", "max_", transform.expression.as_field_key),
quote_style: self.get_quote(),
});
let max_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
max_epxr_indentifier,
min_epxr_indentifier.clone(),
BinaryOperator::Minus,
)));
let div_10_epxr = Expr::Nested(Box::new(get_binary_op_epxr(
max_minus_min,
Expr::Value(Value::Number(num.to_string(), false)),
BinaryOperator::Divide,
)));
let col_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
Expr::Identifier(Ident {
value,
quote_style: self.get_quote(),
}),
min_epxr_indentifier,
BinaryOperator::Minus,
)));
let col_div = Expr::Nested(Box::new(get_binary_op_epxr(
col_minus_min,
div_10_epxr,
BinaryOperator::Divide,
)));
let lest_expr = get_function_expr(
"least".to_string(),
vec![
FunctionArg::Unnamed(FunctionArgExpr::Expr(col_div)),
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(Value::Number(
(num - 1).to_string(),
false,
)))),
],
);
let bin_count_expr_with_alias = SelectItem::ExprWithAlias {
expr: get_binary_op_epxr(
lest_expr,
Expr::Value(Value::Number("1".to_string(), false)),
BinaryOperator::Plus,
),
alias: Ident {
value: transform.expression.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
};
append_alias.insert(transform.expression.as_field_key, bin_count_expr_with_alias);
(sub_projection, vec![], append_alias)
}
model::TransformOp::DateTimeDrill => {
let mut field: String = "".to_string();
let mut value: String = "".to_string();
let mut format = "%Y-%m-%d %H:%M:%S".to_string();
for ele in transform.expression.params.unwrap() {
let v = match ele.value {
model::ValueParam::String(ref v) => v.clone(),
_ => panic!("value error"),
};
if ele.r#type == "field" {
field = v.clone();
}
if ele.r#type == "value" {
value = v.clone();
}
if ele.r#type == "format" {
format = v.clone();
}
}
if field.is_empty() || value.is_empty() {
panic!("field & value error")
}
let time_drill_expr = SelectItem::ExprWithAlias {
expr: get_strftime_expr(
get_data_trunc_expr(
get_format_time_expr(
Expr::Identifier(Ident {
value: field,
quote_style: self.get_quote(),
}),
format,
),
value,
),
"%Y-%m-%d %H:%M:%S".to_string(),
),
alias: Ident {
value: transform.expression.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
};
append_alias.insert(transform.expression.as_field_key, time_drill_expr);
(sub_projection, vec![], append_alias)
}
model::TransformOp::DateTimeFeature => {
let mut field: String = "".to_string();
let mut value: String = "".to_string();
let mut format = "%Y-%m-%d %H:%M:%S".to_string();
for ele in transform.expression.params.unwrap() {
let v = match ele.value {
model::ValueParam::String(ref v) => v.clone(),
_ => panic!("value error"),
};
if ele.r#type == "field" {
field = v.clone();
}
if ele.r#type == "value" {
value = v.clone();
}
if ele.r#type == "format" {
format = v.clone();
}
}
if field.is_empty() || value.is_empty() {
panic!("field & value error")
}
let strftime_param = get_data_feature_format(&value);
let time_drill_expr = SelectItem::ExprWithAlias {
expr: get_data_part_expr(
get_format_time_expr(
Expr::Identifier(Ident {
value: field,
quote_style: self.get_quote(),
}),
format,
),
strftime_param,
),
alias: Ident {
value: transform.expression.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
};
append_alias.insert(transform.expression.as_field_key, time_drill_expr);
(sub_projection, vec![], append_alias)
}
model::TransformOp::Expr => {
let mut sql: String = "".to_string();
for ele in transform.expression.params.unwrap() {
if ele.r#type == "sql" {
let value = match ele.value {
model::ValueParam::String(ref v) => v.clone(),
_ => panic!("value error"),
};
sql = value.clone();
}
}
if sql.is_empty() {
panic!("sql is empty")
}
let expr = SelectItem::ExprWithAlias {
expr: Expr::Identifier(Ident {
value: sql,
quote_style: None,
}),
alias: Ident {
value: transform.expression.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
};
append_alias.insert(transform.expression.as_field_key, expr);
(sub_projection, vec![], append_alias)
}
model::TransformOp::Paint => {
let param = transform.expression.params.unwrap()[0].clone();
let value = match param.value {
model::ValueParam::Map(ref v) => v.clone(),
_ => panic!("value error"),
};
let compressed = match general_purpose::STANDARD.decode(value.map.clone()) {
Ok(v) => v,
Err(_) => panic!("value error"),
};
let slice: &[u8] = compressed.as_slice();
let mut decoder = flate2::bufread::DeflateDecoder::new(slice);
let mut decompressed: Vec<u8> = vec![];
_ = decoder.read_to_end(&mut decompressed);
let rects = find_matching_rects(&decompressed, value.mapwidth as usize);
dbg!(&rects);
let mut results: Vec<Expr> = vec![];
let mut conditions = vec![];
for ele in rects {
let x1 = (value.domain_x[1] - value.domain_x[0]) / value.mapwidth as f64
* ele.x1 as f64
+ value.domain_x[0];
let x2 = (value.domain_x[1] - value.domain_x[0]) / value.mapwidth as f64
* (ele.x2 + 1) as f64
+ value.domain_x[0];
let left = Expr::Between {
expr: Box::new(Expr::Identifier(Ident {
value: value.x.clone(),
quote_style: self.get_quote(),
})),
negated: false,
low: Box::new(Expr::Value(Value::Number(x1.to_string(), false))),
high: Box::new(Expr::Value(Value::Number(x2.to_string(), false))),
};
let y1 = (value.domain_y[1] - value.domain_y[0]) / value.mapwidth as f64
* ele.y1 as f64
+ value.domain_y[0];
let y2 = (value.domain_y[1] - value.domain_y[0]) / value.mapwidth as f64
* (ele.y2 + 1) as f64
+ value.domain_y[0];
let right = Expr::Between {
expr: Box::new(Expr::Identifier(Ident {
value: value.x.clone(),
quote_style: self.get_quote(),
})),
negated: false,
low: Box::new(Expr::Value(Value::Number(y1.to_string(), false))),
high: Box::new(Expr::Value(Value::Number(y2.to_string(), false))),
};
let condition = get_binary_op_epxr(left, right, BinaryOperator::And);
conditions.push(condition);
let value_str = &ele.value.to_string();
let name = match value.dict.get(value_str) {
Some(v) => v.clone(),
None => {
let error_msg = format!("value {} not found", value_str);
panic!("{}", error_msg);
}
};
results.push(Expr::Value(Value::SingleQuotedString(name.name)))
}
let case_expr = Expr::Case {
operand: None,
conditions,
results,
else_result: None,
};
let expr: SelectItem = SelectItem::ExprWithAlias {
expr: case_expr,
alias: Ident {
value: transform.expression.as_field_key.clone(),
quote_style: self.get_as_quote(),
},
};
append_alias.insert(transform.expression.as_field_key, expr);
(sub_projection, vec![], append_alias)
}
}
}
pub fn get_log_n_func_expr(&self, num: i64, field: String) -> Expr {
let log_expr = get_function_expr(
"log".to_string(),
vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
Expr::Identifier(Ident {
value: field,
quote_style: self.get_quote(),
}),
))],
);
let num_expr = get_function_expr(
"log".to_string(),
vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(
Value::Number(num.to_string(), false),
)))],
);
get_binary_op_epxr(log_expr, num_expr, BinaryOperator::Divide)
}
pub fn get_selection(&self, filter: model::Filter, alias: &Alias) -> Option<Expr> {
let mut name = filter.fid;
if name.is_empty() {
name = filter.key;
}
let mut name_expr = Expr::Identifier(Ident {
value: name.clone(),
quote_style: self.get_quote(),
});
if alias.contains_key(&name) {
let slection = alias.get(&name).unwrap();
match slection {
SelectItem::UnnamedExpr(epxr) => name_expr = epxr.clone(),
SelectItem::ExprWithAlias { expr, alias: _ } => name_expr = expr.clone(),
SelectItem::QualifiedWildcard(_, _) => panic!("not support"),
SelectItem::Wildcard(_) => panic!("not support"),
}
}
match filter.rule.rule_type {
model::RuleType::Range => Some(Expr::Between {
expr: Box::new(name_expr),
negated: false,
low: Box::new(get_sql_value(filter.rule.value[0].clone())),
high: Box::new(get_sql_value(filter.rule.value[1].clone())),
}),
model::RuleType::Temporal => {
let left = Expr::TypedString {
data_type: DataType::Timestamp(None, TimezoneInfo::None),
value: "epoch".to_string(),
};
let right = get_binary_op_epxr(
Expr::Cast {
expr: Box::new(name_expr),
data_type: DataType::BigInt(None),
},
Expr::Interval(Interval {
value: Box::new(Expr::Value(Value::SingleQuotedString(
"1 millisecond".to_string(),
))),
leading_field: None,
leading_precision: None,
last_field: None,
fractional_seconds_precision: None,
}),
BinaryOperator::Multiply,
);
Some(Expr::Between {
expr: Box::new(get_binary_op_epxr(left, right, BinaryOperator::Plus)),
negated: false,
low: Box::new(get_function_expr(
"epoch_ms".to_string(),
vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(get_sql_value(
filter.rule.value[0].clone(),
)))],
)),
high: Box::new(get_function_expr(
"epoch_ms".to_string(),
vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(get_sql_value(
filter.rule.value[1].clone(),
)))],
)),
})
}
model::RuleType::OneOf => {
let mut values = Vec::new();
for v in filter.rule.value {
values.push(get_sql_value(v));
}
Some(Expr::InList {
expr: Box::new(name_expr),
list: values,
negated: false,
})
}
}
}
pub fn get_order_epxr(&self, sort: Sort, by: Vec<String>) -> Vec<OrderByExpr> {
if sort != Sort::None {
let mut order_by = Vec::new();
for field in by {
order_by.push(OrderByExpr {
expr: Expr::Identifier(Ident {
value: field,
quote_style: self.get_quote(),
}),
asc: Some(sort == Sort::Asc),
nulls_first: None,
});
}
order_by
} else {
Vec::new()
}
}
}
#[allow(dead_code)]
fn visualize_rects(rects: Vec<model::Rect>) {
let mut grid = vec![0; 128 * 128];
for rect in rects {
for x in rect.x1..=rect.x2 {
for y in rect.y1..=rect.y2 {
let idx = y * 128 + x;
grid[idx] = rect.value;
}
}
}
for y in 0..128 {
for x in 0..128 {
let idx = y * 128 + x;
print!("{} ", grid[idx]);
}
println!();
}
}
#[allow(dead_code)]
fn print_rects_debug(data: &[u8]) {
let mut output = String::new();
for i in 0..128 {
output.push_str(&format!("{:1}", i));
for j in 0..128 {
let index = i * 128 + j;
output.push_str(&format!("{:1}", data[index]));
}
output.push('\n');
}
println!("{}", output);
}
#[allow(clippy::needless_range_loop)]
pub fn find_matching_rects(values: &[u8], width: usize) -> Vec<model::Rect> {
let height = values.len() / width;
let values: Vec<Vec<u8>> = values.chunks(width).map(|chunk| chunk.to_vec()).collect();
let mut used: Vec<Vec<bool>> = vec![vec![false; width]; height];
let mut result: Vec<model::Rect> = Vec::new();
for y in 0..height {
for x in 0..width {
if used[y][x] {
continue;
}
let mut expand_size = 0;
let value = values[y][x];
for size in 0.. {
if x + size >= width || y + size >= height || values[y + size][x + size] != value {
break;
}
let mut can_extend = true;
for i in x..x + size + 1 {
for j in y..y + size + 1 {
if values[j][i] != value || used[j][i] {
can_extend = false;
break;
}
}
if !can_extend {
break;
}
}
if can_extend {
expand_size = size;
} else {
break;
}
}
let mut expand_width = expand_size;
for size in expand_size.. {
if x + size >= width
|| y + expand_size >= height
|| values[y + expand_size][x + size] != value
{
break;
}
let mut can_extend = true;
for i in x..x + size + 1 {
if values[y + expand_size][i] != value || used[y + expand_size][i] {
can_extend = false;
break;
}
}
if can_extend {
expand_width = size;
} else {
break;
}
}
let mut expand_height = expand_size;
if expand_width == expand_size {
for size in expand_size.. {
if x + expand_size >= width
|| y + size >= height
|| values[y + size][x + expand_size] != value
{
break;
}
let mut can_extend = true;
for j in y..y + size + 1 {
if values[j][x + expand_size] != value || used[j][x + expand_size] {
can_extend = false;
break;
}
}
if can_extend {
expand_height = size;
} else {
break;
}
}
}
for i in x..x + expand_width + 1 {
for i2 in y..y + expand_height + 1 {
used[i2][i] = true;
}
}
result.push(model::Rect {
x1: x,
y1: y,
x2: x + expand_width,
y2: y + expand_height,
value: values[y][x],
});
}
}
result
}