1use crate::parser::*;
2use std::sync::atomic::AtomicUsize;
3
4pub struct ParserParams {
5 pub quote_style: Option<char>,
6 pub as_quote_style: Option<char>,
7}
8
9pub struct Parser {
10 pub params: ParserParams,
11 view_count: AtomicUsize,
12}
13
14impl Default for Parser {
15 fn default() -> Self {
16 Self {
17 params: ParserParams {
18 quote_style: Some('"'),
19 as_quote_style: Some('"'),
20 },
21 view_count: AtomicUsize::new(0),
22 }
23 }
24}
25
26pub struct WorkflowParams {
27 pub(crate) alias: HashMap<String, SelectItem>,
30 pub(crate) group_epxrs: Vec<Expr>,
32 pub(crate) project_exprs: Vec<SelectItem>,
34 pub(crate) sub_project_exprs: Vec<SelectItem>,
36 pub(crate) slecction_exprs: Vec<Option<Expr>>,
38 pub(crate) pre_workflow_type: WorkflowType,
40 pub(crate) table: TableWithJoins,
42 pub(crate) order_by: Vec<OrderByExpr>,
44}
45
46#[derive(PartialEq, Debug)]
47pub(crate) enum WorkflowType {
48 Filter,
50 Transform,
52 View,
54 Sort,
56 None,
58}
59
60impl Parser {
61 pub fn new(params: ParserParams) -> Self {
62 Self {
63 params,
64 view_count: AtomicUsize::new(0),
65 }
66 }
67 pub fn get_quote(&self) -> Option<char> {
68 self.params.quote_style
69 }
70
71 pub fn get_as_quote(&self) -> Option<char> {
72 self.params.as_quote_style
73 }
74
75 pub fn parser_dsl(&self, dsl: DSL, dataset: &model::Dataset) -> Result<Statement> {
76 let table: TableWithJoins = if dsl.dataview.is_none() {
77 self.parse_dataset_define(dataset)?
78 } else {
79 self.parse_dataview_define(dataset, dsl.dataview.clone().unwrap())?
80 };
81 let ast = self.parse_workflow(table, dsl)?;
82 Ok(ast)
83 }
84
85 pub fn parser_view_define(&self, statement: Statement) -> Result<TableWithJoins> {
86 if let sqlparser::ast::Statement::Query(query) = statement {
87 Ok(TableWithJoins {
88 relation: TableFactor::Derived {
89 lateral: false,
90 subquery: query.clone(),
91 alias: Some(sqlparser::ast::TableAlias {
92 name: sqlparser::ast::Ident {
93 value: self.generate_view_name(),
94 quote_style: self.get_as_quote(),
95 },
96 columns: vec![],
97 }),
98 },
99 joins: vec![],
100 })
101 } else {
102 Err(ParserError::DataViewCreateError(
103 "dataview statement must be query".to_string(),
104 ))
105 }
106 }
107
108 fn generate_view_name(&self) -> String {
109 format!(
110 "view_{}",
111 self.view_count
112 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
113 )
114 }
115
116 pub fn execute_filter(
117 &self,
118 mut params: WorkflowParams,
119 filter: model::Workflow,
120 ) -> Result<WorkflowParams> {
121 if params.pre_workflow_type == WorkflowType::View {
123 let mut slection: Option<Expr> = None;
124 if !params.slecction_exprs.is_empty() {
125 slection = merge_binary_op(params.slecction_exprs);
126 }
127 let from = self.generate_table_with_joins(
128 params.sub_project_exprs.clone(),
129 params.table.clone(),
130 )?;
131 let new_stetement = Statement::Query(
133 QueryBuilder::new()
134 .with_projection(params.project_exprs)
135 .with_from(from)
136 .with_selection(slection)
137 .with_group_by(params.group_epxrs)
138 .with_order_by(params.order_by.clone())
139 .build_query(),
140 );
141 params.table = self.parser_view_define(new_stetement)?;
142 params.alias = HashMap::new();
143 params.group_epxrs = vec![];
144 params.project_exprs = vec![SelectItem::Wildcard(WildcardAdditionalOptions {
145 opt_exclude: None,
146 opt_except: None,
147 opt_rename: None,
148 opt_replace: None,
149 })];
150 params.sub_project_exprs = vec![];
151 params.slecction_exprs = vec![];
152 }
153 match filter {
154 model::Workflow::Filter { filters } => {
155 let mut params = params;
156 for filter in filters {
157 let select_one = get_selection(filter, ¶ms.alias, self.get_quote())?;
158 params.slecction_exprs.push(Some(select_one));
159 }
160 params.pre_workflow_type = WorkflowType::Filter;
161 Ok(params)
162 }
163 _ => Err(ParserError::DSLParserError(
164 "filter workflow must be filter".to_string(),
165 )),
166 }
167 }
168
169 pub fn execute_transform(
170 &self,
171 params: WorkflowParams,
172 transform: model::Workflow,
173 ) -> Result<WorkflowParams> {
174 match transform {
175 model::Workflow::Transform { transform } => {
176 let mut params = params;
177 for transform in transform {
178 let (sub_project, project, append_alias) =
179 self.get_transform_expr(transform.clone(), ¶ms.alias)?;
180 for (k, v) in append_alias {
181 params.alias.insert(k.to_string(), v.clone());
182 }
183 if !project.is_empty() {
184 params.project_exprs.extend(project);
185 }
186 if !sub_project.is_empty() {
187 params.sub_project_exprs.extend(sub_project)
188 }
189 }
190 params.pre_workflow_type = WorkflowType::Transform;
191 Ok(params)
192 }
193 _ => Err(ParserError::DSLParserError(
194 "transform workflow must be transform".to_string(),
195 )),
196 }
197 }
198 pub fn execute_sort(
199 &self,
200 mut params: WorkflowParams,
201 sort: model::Workflow,
202 ) -> Result<WorkflowParams> {
203 match sort {
204 model::Workflow::Sort { by, sort } => {
205 let sort_exprs = get_order_epxr(sort, by, self.get_quote());
206 params.order_by = sort_exprs;
207 params.pre_workflow_type = WorkflowType::Sort;
208 Ok(params)
209 }
210 _ => Err(ParserError::DSLParserError(
211 "sort workflow must be sort".to_string(),
212 )),
213 }
214 }
215
216 pub fn execute_view(
217 &self,
218 mut params: WorkflowParams,
219 view: model::Workflow,
220 ) -> Result<WorkflowParams> {
221 match view {
222 model::Workflow::View { query } => {
223 for query in query {
224 let read_onley_alias = params.alias.clone();
225 let (project, group, append_alias) =
226 self.get_projection_and_group_expr(&query, &read_onley_alias);
227 params.project_exprs.extend(project);
228
229 if !group.is_empty() {
230 params.group_epxrs = group
231 }
232 for (k, v) in append_alias {
233 params.alias.insert(k.to_string(), v.clone());
234 }
235 params.pre_workflow_type = WorkflowType::View;
236 }
237 Ok(params)
238 }
239 _ => Err(ParserError::DSLParserError(
240 "view workflow must be view".to_string(),
241 )),
242 }
243 }
244
245 pub fn parse_dataset_define(&self, dataset: &model::Dataset) -> Result<TableWithJoins> {
246 match dataset {
247 model::Dataset::Table { name } => Ok(TableWithJoins {
248 relation: TableFactor::Table {
249 name: ObjectName(vec![sqlparser::ast::Ident {
250 value: name.clone(),
251 quote_style: self.get_quote(),
252 }]),
253 alias: None,
254 args: None,
255 with_hints: vec![],
256 },
257 joins: vec![],
258 }),
259 model::Dataset::View { sql } => {
260 let sub_query = SqlParser::parse_sql(&DuckDbDialect, sql).unwrap();
261 if sub_query.len() != 1 {
262 return Err(ParserError::DataViewCreateError(
263 "sub query must be one".to_string(),
264 ));
265 }
266 self.parser_view_define(sub_query[0].clone())
267 }
268 }
269 }
270
271 pub fn parse_dataview_define(
273 &self,
274 dataset: &model::Dataset,
275 view: Vec<DataView>,
276 ) -> Result<TableWithJoins> {
277 if view.is_empty() || view.len() > 1 {
278 return Err(ParserError::DataViewCreateError(
279 "view must be one".to_string(),
280 ));
281 }
282 match &view[0] {
283 DataView::Sql { query } => {
284 if query.is_empty() || query.len() > 1 {
285 return Err(ParserError::DataViewCreateError(
286 "query must be one".to_string(),
287 ));
288 }
289 let query = query[0].clone();
290 let origin_ast = SqlParser::parse_sql(&DuckDbDialect, &query.sql).unwrap();
291 let check = matches!(&origin_ast[0], Statement::Query(_));
292 if !check {
293 return Err(ParserError::DataViewCreateError(
294 "query must be select".to_string(),
295 ));
296 }
297 let fromtable = self.parse_dataset_define(dataset)?;
299 let view_project = self.create_data_view_projection(query.fid_map.clone());
300 let rewrite_fid_view = TableWithJoins {
301 relation: TableFactor::Derived {
302 lateral: false,
303 subquery: QueryBuilder::new()
304 .with_projection(view_project)
305 .with_from(vec![fromtable])
306 .build_query(),
307 alias: Some(sqlparser::ast::TableAlias {
308 name: sqlparser::ast::Ident {
309 value: "k_gw_write_view".to_owned(),
310 quote_style: self.get_as_quote(),
311 },
312 columns: vec![],
313 }),
314 },
315 joins: vec![],
316 };
317 let new_ast =
319 self.replace_default_table(origin_ast[0].clone(), rewrite_fid_view)?;
320 self.parser_view_define(new_ast)
321 }
322 }
323 }
324
325 pub fn replace_default_table(
326 &self,
327 origin: Statement,
328 review_table: TableWithJoins,
329 ) -> Result<Statement> {
330 let new_ast: Statement = match origin {
331 Statement::Query(query) => {
332 let mut new_query = query.clone();
333 match *query.body {
334 SetExpr::Select(select) => {
335 if select.from.len() > 1 {
336 return Err(ParserError::DataViewCreateError(
337 "body ast must select one".to_string(),
338 ));
339 }
340 new_query.body = Box::new(SetExpr::Select(Box::new(Select {
341 from: vec![self.rewrite_data_view_table_with_join(
342 select.from[0].clone(),
343 review_table,
344 )?],
345 ..*select
346 })));
347 }
348 _ => {
349 return Err(ParserError::DataViewCreateError(
350 "body ast must select".to_string(),
351 ))
352 }
353 };
354 Statement::Query(new_query)
355 }
356 _ => {
357 return Err(ParserError::DataViewCreateError(
358 "body ast must select".to_string(),
359 ))
360 }
361 };
362
363 Ok(new_ast)
364 }
365
366 pub fn rewrite_data_view_table_with_join(
367 &self,
368 origin_table: TableWithJoins,
369 review_table: TableWithJoins,
370 ) -> Result<TableWithJoins> {
371 match origin_table.relation {
372 TableFactor::Table {
373 name: _,
374 alias: _,
375 args: _,
376 with_hints: _,
377 } => Ok(review_table),
378 TableFactor::Derived {
379 lateral: _,
380 subquery,
381 alias: _,
382 } => {
383 let new_sub_query = match *subquery.body {
384 SetExpr::Select(mut select) => {
385 if select.from.len() > 1 {
386 return Err(ParserError::DSLSerializeError(
387 "body ast must select one".to_string(),
388 ));
389 }
390 select.from[0] = self.rewrite_data_view_table_with_join(
391 select.from[0].clone(),
392 review_table,
393 )?;
394 Query {
395 body: Box::new(SetExpr::Select(select)),
396 ..*subquery
397 }
398 }
399 _ => {
400 return Err(ParserError::DataViewCreateError(
401 "body ast must select".to_string(),
402 ));
403 }
404 };
405 Ok(TableWithJoins {
406 relation: TableFactor::Derived {
407 lateral: false,
408 subquery: Box::new(new_sub_query),
409 alias: Some(sqlparser::ast::TableAlias {
410 name: sqlparser::ast::Ident {
411 value: "k_gw_review_default".to_owned(),
412 quote_style: self.get_as_quote(),
413 },
414 columns: vec![],
415 }),
416 },
417 joins: vec![],
418 })
419 }
420 _ => Err(ParserError::DataViewCreateError("not support".to_string())),
421 }
422 }
423
424 pub fn create_data_view_projection(&self, fid_map: HashMap<String, String>) -> Projection {
425 let mut project_exprs = vec![];
426 let mut vec: Vec<_> = fid_map.iter().collect();
427 vec.sort_by(|&(a, _), &(b, _)| a.cmp(b));
428 for item in vec {
429 let expr = Expr::Identifier(Ident {
430 value: item.1.to_string(),
431 quote_style: self.get_quote(),
432 });
433 let alias = Ident {
434 value: item.0.to_string(),
435 quote_style: self.get_as_quote(),
436 };
437 project_exprs.push(SelectItem::ExprWithAlias { expr, alias });
438 }
439 project_exprs
440 }
441
442 pub fn parse_workflow(&self, table: TableWithJoins, dsl: DSL) -> Result<Statement> {
443 let mut params: WorkflowParams = WorkflowParams {
445 alias: HashMap::new(),
446 group_epxrs: vec![],
447 project_exprs: vec![],
448 sub_project_exprs: vec![],
449 slecction_exprs: vec![],
450 pre_workflow_type: WorkflowType::None,
451 order_by: vec![],
452 table,
453 };
454
455 for ele in dsl.workflow {
457 match ele {
458 model::Workflow::Filter { filters: _ } => {
459 params = self.execute_filter(params, ele)?;
460 }
461
462 model::Workflow::Transform { transform: _ } => {
463 params = self.execute_transform(params, ele)?;
464 }
465
466 model::Workflow::View { query: _ } => {
467 params = self.execute_view(params, ele)?;
468 }
469
470 model::Workflow::Sort { sort: _, by: _ } => {
471 params = self.execute_sort(params, ele)?;
472 }
473 }
474 }
475
476 let (limit, offset) = get_limit_offset(dsl.limit, dsl.offset);
478
479 let mut slection: Option<Expr> = None;
481 if !params.slecction_exprs.is_empty() {
482 slection = merge_binary_op(params.slecction_exprs);
483 }
484 let from =
485 self.generate_table_with_joins(params.sub_project_exprs.clone(), params.table.clone())?;
486 let new_stetement = Statement::Query(
487 QueryBuilder::new()
488 .with_projection(params.project_exprs)
489 .with_from(from)
490 .with_selection(slection)
491 .with_group_by(params.group_epxrs)
492 .with_order_by(params.order_by.clone())
493 .with_limit(limit)
494 .with_offset(offset)
495 .build_query(),
496 );
497 Ok(new_stetement)
498 }
499
500 fn generate_table_with_joins(
501 &self,
502 mut sub_project_exprs: Vec<SelectItem>,
503 table: TableWithJoins,
504 ) -> result::Result<Vec<TableWithJoins>, ParserError> {
505 let from: Vec<TableWithJoins>;
506 if !sub_project_exprs.is_empty() {
508 match table.relation {
509 TableFactor::Table {
510 name: _,
511 alias: _,
512 with_hints: _,
513 args: _,
514 } => {
515 sub_project_exprs.push(SelectItem::Wildcard(WildcardAdditionalOptions {
516 opt_exclude: None,
517 opt_except: None,
518 opt_rename: None,
519 opt_replace: None,
520 }));
521 let statement = sqlparser::ast::Statement::Query(
522 QueryBuilder::new()
523 .with_from(vec![table])
524 .with_projection(sub_project_exprs)
525 .build_query(),
526 );
527 from = vec![self.parser_view_define(statement)?];
528 }
529 TableFactor::Derived {
530 lateral,
531 subquery,
532 alias,
533 } => {
534 let mut sub_query = subquery;
535 if let sqlparser::ast::SetExpr::Select(select) = &mut *sub_query.body {
536 select.projection = sub_project_exprs;
537 }
538 from = vec![TableWithJoins {
539 relation: TableFactor::Derived {
540 lateral,
541 subquery: sub_query,
542 alias,
543 },
544 joins: vec![],
545 }];
546 }
547 _ => todo!("not support"),
548 }
549 } else {
550 from = vec![table];
551 }
552 Ok(from)
553 }
554
555 pub fn get_projection_and_group_expr(
556 &self,
557 query: &model::Query,
558 alias: &Alias,
559 ) -> (Projection, GroupBy, Alias) {
560 let mut new_alias = alias.clone();
561 match query {
562 model::Query::Aggregate { group_by, measures } => {
563 let mut projection = Projection::default();
564 let mut group_by_set: Vec<Expr> = GroupBy::default();
565 for ele in group_by {
566 group_by_set.push(Expr::Identifier(Ident {
567 value: ele.clone(),
568 quote_style: self.get_quote(),
569 }));
570 if let Some(e) = alias.get(ele) {
571 projection.push(e.clone());
572 } else {
573 projection.push(SelectItem::UnnamedExpr(Expr::Identifier(Ident {
574 value: ele.clone(),
575 quote_style: self.get_quote(),
576 })));
577 }
578 }
579 for ele in measures {
580 let mut name_epxr: SelectItem;
581 if ele.field == "*" {
582 name_epxr = SelectItem::Wildcard(WildcardAdditionalOptions {
583 opt_exclude: None,
584 opt_except: None,
585 opt_rename: None,
586 opt_replace: None,
587 });
588 } else if let Some(format) = &ele.format {
589 name_epxr = SelectItem::ExprWithAlias {
590 expr: get_format_time_expr(
591 Expr::Identifier(Ident {
592 value: ele.field.clone(),
593 quote_style: self.get_quote(),
594 }),
595 format.clone(),
596 ),
597 alias: Ident {
598 value: ele.as_field_key.clone(),
599 quote_style: self.get_as_quote(),
600 },
601 };
602 } else {
603 name_epxr = SelectItem::ExprWithAlias {
604 expr: Expr::Identifier(Ident {
605 value: ele.field.clone(),
606 quote_style: self.get_quote(),
607 }),
608 alias: Ident {
609 value: ele.as_field_key.clone(),
610 quote_style: self.get_as_quote(),
611 },
612 };
613 }
614
615 if !new_alias.contains_key(&ele.field) {
616 new_alias.insert(ele.as_field_key.clone(), name_epxr.clone());
617 } else {
618 name_epxr = new_alias.get(&ele.field).unwrap().clone();
619 }
620 let expr = match ele.agg {
621 model::Agg::DistinctCount => Expr::Function(Function {
622 name: ObjectName(vec![Ident {
623 value: "count".to_string(),
624 quote_style: None,
625 }]),
626 args: vec![FunctionArg::Unnamed(get_expr_from_select(name_epxr))],
627 over: None,
628 distinct: true,
629 special: false,
630 order_by: vec![],
631 }),
632 _ => get_function_expr(
633 get_agg_func(ele.agg.clone()),
634 vec![FunctionArg::Unnamed(get_expr_from_select(name_epxr))],
635 ),
636 };
637 projection.push(SelectItem::ExprWithAlias {
638 expr,
639 alias: Ident {
640 value: ele.as_field_key.clone(),
641 quote_style: self.get_as_quote(),
642 },
643 });
644 }
645 (projection, group_by_set, new_alias)
646 }
647 model::Query::Raw { fields } => {
648 let mut projection = Projection::default();
649 let group_by: Vec<Expr> = GroupBy::default();
650 for field in fields {
651 if let Some(f) = alias.get(field) {
652 projection.push(f.clone());
653 } else if field == "*" {
654 projection.push(SelectItem::Wildcard(WildcardAdditionalOptions {
655 opt_exclude: None,
656 opt_except: None,
657 opt_rename: None,
658 opt_replace: None,
659 }));
660 } else {
661 projection.push(SelectItem::UnnamedExpr(Expr::Identifier(Ident {
662 value: field.clone(),
663 quote_style: self.get_quote(),
664 })));
665 }
666 }
667 (projection, group_by, new_alias)
668 }
669 }
670 }
671
672 pub fn get_transform_expr(
673 &self,
674 transform: model::Transform,
675 alias: &Alias,
676 ) -> Result<(Projection, Projection, Alias)> {
677 let mut sub_projection = Projection::default();
678 let mut append_alias = alias.clone();
679 match transform.expression.op {
680 model::TransformOp::One => {
681 let sleect_item =
682 SelectItem::UnnamedExpr(Expr::Value(Value::Number("1".to_string(), false)));
683 append_alias.insert(transform.expression.as_field_key, sleect_item);
684 Ok((sub_projection, vec![], append_alias))
685 }
686 model::TransformOp::Bin => {
687 let num = transform.expression.num.unwrap_or(10);
688 let param = transform.expression.params.unwrap()[0].clone();
690 let value = match param.value {
691 model::ValueParam::String(ref v) => v.clone(),
692 model::ValueParam::Map(_) => {
693 return Err(ParserError::DSLSerializeError(
694 "bin param must be string".to_string(),
695 ))
696 }
697 };
698 let min_expr = SelectItem::ExprWithAlias {
699 expr: get_window_function_expr(
700 "min".to_string(),
701 vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
702 Expr::Identifier(Ident {
703 value: value.clone(),
704 quote_style: self.get_quote(),
705 }),
706 ))],
707 WindowType::WindowSpec(WindowSpec {
708 partition_by: vec![],
709 order_by: vec![],
710 window_frame: None,
711 }),
712 ),
713 alias: Ident {
714 value: format!("{}{}", "min_", transform.expression.as_field_key),
715 quote_style: self.get_as_quote(),
716 },
717 };
718 let max_expr = SelectItem::ExprWithAlias {
719 expr: get_window_function_expr(
720 "max".to_string(),
721 vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
722 Expr::Identifier(Ident {
723 value: value.clone(),
724 quote_style: self.get_quote(),
725 }),
726 ))],
727 WindowType::WindowSpec(WindowSpec {
728 partition_by: vec![],
729 order_by: vec![],
730 window_frame: None,
731 }),
732 ),
733 alias: Ident {
734 value: format!("{}{}", "max_", transform.expression.as_field_key),
735 quote_style: self.get_as_quote(),
736 },
737 };
738 sub_projection.push(min_expr);
739 sub_projection.push(max_expr);
740 let min_epxr_indentifier = Expr::Identifier(Ident {
742 value: format!("{}{}", "min_", transform.expression.as_field_key),
743 quote_style: self.get_quote(),
744 });
745 let max_epxr_indentifier = Expr::Identifier(Ident {
746 value: format!("{}{}", "max_", transform.expression.as_field_key),
747 quote_style: self.get_quote(),
748 });
749 let max_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
751 max_epxr_indentifier,
752 min_epxr_indentifier.clone(),
753 BinaryOperator::Minus,
754 )));
755 let div_10_epxr = Expr::Nested(Box::new(get_binary_op_epxr(
757 max_minus_min,
758 Expr::Value(Value::Number(num.to_string() + ".0", false)),
759 BinaryOperator::Divide,
760 )));
761 let col_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
763 Expr::Identifier(Ident {
764 value,
765 quote_style: self.get_quote(),
766 }),
767 min_epxr_indentifier.clone(),
768 BinaryOperator::Minus,
769 )));
770 let col_div = Expr::Nested(Box::new(get_binary_op_epxr(
772 col_minus_min,
773 div_10_epxr.clone(),
774 BinaryOperator::Divide,
775 )));
776 let floor_expr = get_function_expr(
778 "floor".to_string(),
779 vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(col_div))],
780 );
781 let least_expr = get_function_expr(
783 "least".to_string(),
784 vec![
785 FunctionArg::Unnamed(FunctionArgExpr::Expr(floor_expr)),
786 FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(Value::Number(
787 (num - 1).to_string(),
788 false,
789 )))),
790 ],
791 );
792 let final_expr = Expr::Nested(Box::new(get_binary_op_epxr(
795 least_expr,
796 div_10_epxr,
797 BinaryOperator::Multiply,
798 )));
799 let final_expr_with_alias = SelectItem::ExprWithAlias {
803 expr: Expr::Nested(Box::new(get_binary_op_epxr(
804 min_epxr_indentifier,
805 final_expr,
806 BinaryOperator::Plus,
807 ))),
808 alias: Ident {
809 value: transform.expression.as_field_key.clone(),
810 quote_style: self.get_as_quote(),
811 },
812 };
813 append_alias.insert(transform.expression.as_field_key, final_expr_with_alias);
814 Ok((sub_projection, vec![], append_alias))
815 }
816 model::TransformOp::Log2 => {
817 let param = transform.expression.params.unwrap()[0].clone();
818 let value = match param.value {
819 model::ValueParam::String(ref v) => v.clone(),
820 _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
821 };
822 let log_2_epxr = get_log_n_func_expr(2, value, self.get_quote());
823 let log_2_expr_with_alias = SelectItem::ExprWithAlias {
824 expr: log_2_epxr,
825 alias: Ident {
826 value: transform.expression.as_field_key.clone(),
827 quote_style: self.get_as_quote(),
828 },
829 };
830 append_alias.insert(transform.expression.as_field_key, log_2_expr_with_alias);
831 Ok((sub_projection, vec![], append_alias))
832 }
833 model::TransformOp::Log10 => {
834 let param = transform.expression.params.unwrap()[0].clone();
835 let value = match param.value {
836 model::ValueParam::String(ref v) => v.clone(),
837 _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
838 };
839 let log_10_epxr = get_log_n_func_expr(10, value, self.get_quote());
840 let log_10_expr_with_alias = SelectItem::ExprWithAlias {
841 expr: log_10_epxr,
842 alias: Ident {
843 value: transform.expression.as_field_key.clone(),
844 quote_style: self.get_as_quote(),
845 },
846 };
847 append_alias.insert(transform.expression.as_field_key, log_10_expr_with_alias);
848 Ok((sub_projection, vec![], append_alias))
849 }
850 model::TransformOp::Log => {
851 let param = transform.expression.params.unwrap()[0].clone();
852 let value = match param.value {
853 model::ValueParam::String(ref v) => v.clone(),
854 _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
855 };
856 let log_epxr = get_log_n_func_expr(
857 transform.expression.num.unwrap_or(10),
858 value,
859 self.get_quote(),
860 );
861 let log_expr_with_alias = SelectItem::ExprWithAlias {
862 expr: log_epxr,
863 alias: Ident {
864 value: transform.expression.as_field_key.clone(),
865 quote_style: self.get_as_quote(),
866 },
867 };
868 append_alias.insert(transform.expression.as_field_key, log_expr_with_alias);
869 Ok((sub_projection, vec![], append_alias))
870 }
871 model::TransformOp::BinCount => {
872 let num = transform.expression.num.unwrap_or(10);
874 let param = transform.expression.params.unwrap()[0].clone();
875 let value = match param.value {
876 model::ValueParam::String(ref v) => v.clone(),
877 _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
878 };
879 let min_expr = SelectItem::ExprWithAlias {
880 expr: get_window_function_expr(
881 "min".to_string(),
882 vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
883 Expr::Identifier(Ident {
884 value: value.clone(),
885 quote_style: self.get_quote(),
886 }),
887 ))],
888 WindowType::WindowSpec(WindowSpec {
889 partition_by: vec![],
890 order_by: vec![],
891 window_frame: None,
892 }),
893 ),
894 alias: Ident {
895 value: format!("{}{}", "min_", transform.expression.as_field_key),
896 quote_style: self.get_as_quote(),
897 },
898 };
899 let max_expr = SelectItem::ExprWithAlias {
900 expr: get_window_function_expr(
901 "max".to_string(),
902 vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
903 Expr::Identifier(Ident {
904 value: value.clone(),
905 quote_style: self.get_quote(),
906 }),
907 ))],
908 WindowType::WindowSpec(WindowSpec {
909 partition_by: vec![],
910 order_by: vec![],
911 window_frame: None,
912 }),
913 ),
914 alias: Ident {
915 value: format!("{}{}", "max_", transform.expression.as_field_key),
916 quote_style: self.get_as_quote(),
917 },
918 };
919 sub_projection.push(min_expr);
920 sub_projection.push(max_expr);
921 let min_epxr_indentifier = Expr::Identifier(Ident {
923 value: format!("{}{}", "min_", transform.expression.as_field_key),
924 quote_style: self.get_quote(),
925 });
926 let max_epxr_indentifier = Expr::Identifier(Ident {
927 value: format!("{}{}", "max_", transform.expression.as_field_key),
928 quote_style: self.get_quote(),
929 });
930 let max_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
932 max_epxr_indentifier,
933 min_epxr_indentifier.clone(),
934 BinaryOperator::Minus,
935 )));
936 let div_10_epxr = Expr::Nested(Box::new(get_binary_op_epxr(
938 max_minus_min,
939 Expr::Value(Value::Number(num.to_string(), false)),
940 BinaryOperator::Divide,
941 )));
942 let col_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
944 Expr::Identifier(Ident {
945 value,
946 quote_style: self.get_quote(),
947 }),
948 min_epxr_indentifier,
949 BinaryOperator::Minus,
950 )));
951 let col_div = Expr::Nested(Box::new(get_binary_op_epxr(
953 col_minus_min,
954 div_10_epxr,
955 BinaryOperator::Divide,
956 )));
957 let lest_expr = get_function_expr(
959 "least".to_string(),
960 vec![
961 FunctionArg::Unnamed(FunctionArgExpr::Expr(col_div)),
962 FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(Value::Number(
963 (num - 1).to_string(),
964 false,
965 )))),
966 ],
967 );
968 let bin_count_expr_with_alias = SelectItem::ExprWithAlias {
970 expr: get_binary_op_epxr(
971 lest_expr,
972 Expr::Value(Value::Number("1".to_string(), false)),
973 BinaryOperator::Plus,
974 ),
975 alias: Ident {
976 value: transform.expression.as_field_key.clone(),
977 quote_style: self.get_as_quote(),
978 },
979 };
980 append_alias.insert(transform.expression.as_field_key, bin_count_expr_with_alias);
981 Ok((sub_projection, vec![], append_alias))
982 }
983 model::TransformOp::DateTimeDrill => {
984 let mut field: String = "".to_string();
985 let mut value: String = "".to_string();
986 let mut format = "%Y-%m-%d %H:%M:%S".to_string();
987 for ele in transform.expression.params.unwrap() {
988 let v = match ele.value {
989 model::ValueParam::String(ref v) => v.clone(),
990 _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
991 };
992 if ele.r#type == "field" {
993 field = v.clone();
994 }
995 if ele.r#type == "value" {
996 value = v.clone();
997 }
998 if ele.r#type == "format" {
999 format = v.clone();
1000 }
1001 }
1002 if field.is_empty() || value.is_empty() {
1003 return Err(ParserError::DSLSerializeError(
1004 "field & value error".to_string(),
1005 ));
1006 }
1007 let time_drill_expr = SelectItem::ExprWithAlias {
1008 expr: get_strftime_expr(
1009 get_data_trunc_expr(
1010 get_format_time_expr(
1011 Expr::Identifier(Ident {
1012 value: field,
1013 quote_style: self.get_quote(),
1014 }),
1015 format,
1016 ),
1017 value,
1018 ),
1019 "%Y-%m-%d %H:%M:%S".to_string(),
1020 ),
1021 alias: Ident {
1022 value: transform.expression.as_field_key.clone(),
1023 quote_style: self.get_as_quote(),
1024 },
1025 };
1026 append_alias.insert(transform.expression.as_field_key, time_drill_expr);
1027 Ok((sub_projection, vec![], append_alias))
1028 }
1029 model::TransformOp::DateTimeFeature => {
1030 let mut field: String = "".to_string();
1031 let mut value: String = "".to_string();
1032 let mut format = "%Y-%m-%d %H:%M:%S".to_string();
1033 for ele in transform.expression.params.unwrap() {
1034 let v = match ele.value {
1035 model::ValueParam::String(ref v) => v.clone(),
1036 _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
1037 };
1038
1039 if ele.r#type == "field" {
1040 field = v.clone();
1041 }
1042 if ele.r#type == "value" {
1043 value = v.clone();
1044 }
1045 if ele.r#type == "format" {
1046 format = v.clone();
1047 }
1048 }
1049 if field.is_empty() || value.is_empty() {
1050 return Err(ParserError::DSLSerializeError(
1051 "field & value error".to_string(),
1052 ));
1053 }
1054 let strftime_param = get_data_feature_format(&value);
1055 let time_drill_expr = SelectItem::ExprWithAlias {
1056 expr: get_data_part_expr(
1057 get_format_time_expr(
1058 Expr::Identifier(Ident {
1059 value: field,
1060 quote_style: self.get_quote(),
1061 }),
1062 format,
1063 ),
1064 strftime_param,
1065 ),
1066 alias: Ident {
1067 value: transform.expression.as_field_key.clone(),
1068 quote_style: self.get_as_quote(),
1069 },
1070 };
1071 append_alias.insert(transform.expression.as_field_key, time_drill_expr);
1072 Ok((sub_projection, vec![], append_alias))
1073 }
1074 model::TransformOp::Expr => {
1075 let mut sql: String = "".to_string();
1076 for ele in transform.expression.params.unwrap() {
1077 if ele.r#type == "sql" {
1078 let value = match ele.value {
1079 model::ValueParam::String(ref v) => v.clone(),
1080 _ => {
1081 return Err(ParserError::DSLSerializeError(
1082 "value error".to_string(),
1083 ))
1084 }
1085 };
1086 sql = value.clone();
1087 }
1088 }
1089 if sql.is_empty() {
1090 return Err(ParserError::DSLSerializeError("sql empty".to_string()));
1091 }
1092 let expr = SelectItem::ExprWithAlias {
1093 expr: Expr::Identifier(Ident {
1094 value: sql,
1095 quote_style: None,
1096 }),
1097 alias: Ident {
1098 value: transform.expression.as_field_key.clone(),
1099 quote_style: self.get_as_quote(),
1100 },
1101 };
1102 append_alias.insert(transform.expression.as_field_key, expr);
1103 Ok((sub_projection, vec![], append_alias))
1104 }
1105 model::TransformOp::Paint => {
1106 let param = transform.expression.params.unwrap()[0].clone();
1107 let value = match param.value {
1109 model::ValueParam::Map(ref v) => v.clone(),
1110 _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
1111 };
1112
1113 let compressed = match general_purpose::STANDARD.decode(value.map.clone()) {
1114 Ok(v) => v,
1115 Err(_) => {
1116 return Err(ParserError::DSLSerializeError("value error".to_string()))
1117 }
1118 };
1119 let slice: &[u8] = compressed.as_slice();
1120 let mut decoder = flate2::bufread::DeflateDecoder::new(slice);
1121 let mut decompressed: Vec<u8> = vec![];
1122 _ = decoder.read_to_end(&mut decompressed);
1123
1124 let rects = find_matching_rects(&decompressed, value.mapwidth as usize);
1125
1126 let mut results: Vec<Expr> = vec![];
1127 let mut conditions = vec![];
1128 for ele in rects {
1129 let x1 = (value.domain_x[1] - value.domain_x[0]) / value.mapwidth as f64
1131 * ele.x1 as f64
1132 + value.domain_x[0];
1133 let x2 = (value.domain_x[1] - value.domain_x[0]) / value.mapwidth as f64
1134 * (ele.x2 + 1) as f64
1135 + value.domain_x[0];
1136 let left = Expr::Between {
1137 expr: Box::new(Expr::Identifier(Ident {
1138 value: value.x.clone(),
1139 quote_style: self.get_quote(),
1140 })),
1141 negated: false,
1142 low: Box::new(Expr::Value(Value::Number(x1.to_string(), false))),
1143 high: Box::new(Expr::Value(Value::Number(x2.to_string(), false))),
1144 };
1145 let y1 = (value.domain_y[1] - value.domain_y[0]) / value.mapwidth as f64
1147 * ele.y1 as f64
1148 + value.domain_y[0];
1149 let y2 = (value.domain_y[1] - value.domain_y[0]) / value.mapwidth as f64
1150 * (ele.y2 + 1) as f64
1151 + value.domain_y[0];
1152 let right = Expr::Between {
1153 expr: Box::new(Expr::Identifier(Ident {
1154 value: value.y.clone(),
1155 quote_style: self.get_quote(),
1156 })),
1157 negated: false,
1158 low: Box::new(Expr::Value(Value::Number(y1.to_string(), false))),
1159 high: Box::new(Expr::Value(Value::Number(y2.to_string(), false))),
1160 };
1161 let condition = get_binary_op_epxr(left, right, BinaryOperator::And);
1162 conditions.push(condition);
1163 let value_str = &ele.value.to_string();
1165
1166 let name = match value.dict.get(value_str) {
1167 Some(v) => v.clone(),
1168 None => {
1169 let error_msg = format!("value {} not found", value_str);
1170 return Err(ParserError::DSLSerializeError(error_msg));
1171 }
1172 };
1173 results.push(Expr::Value(Value::SingleQuotedString(name.name)))
1174 }
1175
1176 let case_expr = Expr::Case {
1177 operand: None,
1178 conditions,
1179 results,
1180 else_result: None,
1181 };
1182 let expr: SelectItem = SelectItem::ExprWithAlias {
1183 expr: case_expr,
1184 alias: Ident {
1185 value: transform.expression.as_field_key.clone(),
1186 quote_style: self.get_as_quote(),
1187 },
1188 };
1189 append_alias.insert(transform.expression.as_field_key, expr);
1190 Ok((sub_projection, vec![], append_alias))
1191 }
1192 }
1193 }
1194}