graphic_walker_parser/parser/
workflow.rs

1use crate::parser::*;
2use std::sync::atomic::AtomicUsize;
3
4pub struct ParserParams {
5    pub quote_style: Option<char>,
6    pub as_quote_style: Option<char>,
7}
8
9pub struct Parser {
10    pub params: ParserParams,
11    view_count: AtomicUsize,
12}
13
14impl Default for Parser {
15    fn default() -> Self {
16        Self {
17            params: ParserParams {
18                quote_style: Some('"'),
19                as_quote_style: Some('"'),
20            },
21            view_count: AtomicUsize::new(0),
22        }
23    }
24}
25
26pub struct WorkflowParams {
27    // key:already exist column name, value: column epxr
28    // use: use to replace column name to expr
29    pub(crate) alias: HashMap<String, SelectItem>,
30    // already group by expr
31    pub(crate) group_epxrs: Vec<Expr>,
32    // already project expr
33    pub(crate) project_exprs: Vec<SelectItem>,
34    // already sub project expr
35    pub(crate) sub_project_exprs: Vec<SelectItem>,
36    // already selections expr
37    pub(crate) slecction_exprs: Vec<Option<Expr>>,
38    // excute
39    pub(crate) pre_workflow_type: WorkflowType,
40    // table
41    pub(crate) table: TableWithJoins,
42    // order by
43    pub(crate) order_by: Vec<OrderByExpr>,
44}
45
46#[derive(PartialEq, Debug)]
47pub(crate) enum WorkflowType {
48    // select
49    Filter,
50    // group by
51    Transform,
52    // project
53    View,
54    // sort
55    Sort,
56    // None
57    None,
58}
59
60impl Parser {
61    pub fn new(params: ParserParams) -> Self {
62        Self {
63            params,
64            view_count: AtomicUsize::new(0),
65        }
66    }
67    pub fn get_quote(&self) -> Option<char> {
68        self.params.quote_style
69    }
70
71    pub fn get_as_quote(&self) -> Option<char> {
72        self.params.as_quote_style
73    }
74
75    pub fn parser_dsl(&self, dsl: DSL, dataset: &model::Dataset) -> Result<Statement> {
76        let table: TableWithJoins = if dsl.dataview.is_none() {
77            self.parse_dataset_define(dataset)?
78        } else {
79            self.parse_dataview_define(dataset, dsl.dataview.clone().unwrap())?
80        };
81        let ast = self.parse_workflow(table, dsl)?;
82        Ok(ast)
83    }
84
85    pub fn parser_view_define(&self, statement: Statement) -> Result<TableWithJoins> {
86        if let sqlparser::ast::Statement::Query(query) = statement {
87            Ok(TableWithJoins {
88                relation: TableFactor::Derived {
89                    lateral: false,
90                    subquery: query.clone(),
91                    alias: Some(sqlparser::ast::TableAlias {
92                        name: sqlparser::ast::Ident {
93                            value: self.generate_view_name(),
94                            quote_style: self.get_as_quote(),
95                        },
96                        columns: vec![],
97                    }),
98                },
99                joins: vec![],
100            })
101        } else {
102            Err(ParserError::DataViewCreateError(
103                "dataview statement must be query".to_string(),
104            ))
105        }
106    }
107
108    fn generate_view_name(&self) -> String {
109        format!(
110            "view_{}",
111            self.view_count
112                .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
113        )
114    }
115
116    pub fn execute_filter(
117        &self,
118        mut params: WorkflowParams,
119        filter: model::Workflow,
120    ) -> Result<WorkflowParams> {
121        // if pre workflow is transform, then merge transform
122        if params.pre_workflow_type == WorkflowType::View {
123            let mut slection: Option<Expr> = None;
124            if !params.slecction_exprs.is_empty() {
125                slection = merge_binary_op(params.slecction_exprs);
126            }
127            let from = self.generate_table_with_joins(
128                params.sub_project_exprs.clone(),
129                params.table.clone(),
130            )?;
131            // select sum(id) from id -> select * from (select sum(id) from id) as view where ...
132            let new_stetement = Statement::Query(
133                QueryBuilder::new()
134                    .with_projection(params.project_exprs)
135                    .with_from(from)
136                    .with_selection(slection)
137                    .with_group_by(params.group_epxrs)
138                    .with_order_by(params.order_by.clone())
139                    .build_query(),
140            );
141            params.table = self.parser_view_define(new_stetement)?;
142            params.alias = HashMap::new();
143            params.group_epxrs = vec![];
144            params.project_exprs = vec![SelectItem::Wildcard(WildcardAdditionalOptions {
145                opt_exclude: None,
146                opt_except: None,
147                opt_rename: None,
148                opt_replace: None,
149            })];
150            params.sub_project_exprs = vec![];
151            params.slecction_exprs = vec![];
152        }
153        match filter {
154            model::Workflow::Filter { filters } => {
155                let mut params = params;
156                for filter in filters {
157                    let select_one = get_selection(filter, &params.alias, self.get_quote())?;
158                    params.slecction_exprs.push(Some(select_one));
159                }
160                params.pre_workflow_type = WorkflowType::Filter;
161                Ok(params)
162            }
163            _ => Err(ParserError::DSLParserError(
164                "filter workflow must be filter".to_string(),
165            )),
166        }
167    }
168
169    pub fn execute_transform(
170        &self,
171        params: WorkflowParams,
172        transform: model::Workflow,
173    ) -> Result<WorkflowParams> {
174        match transform {
175            model::Workflow::Transform { transform } => {
176                let mut params = params;
177                for transform in transform {
178                    let (sub_project, project, append_alias) =
179                        self.get_transform_expr(transform.clone(), &params.alias)?;
180                    for (k, v) in append_alias {
181                        params.alias.insert(k.to_string(), v.clone());
182                    }
183                    if !project.is_empty() {
184                        params.project_exprs.extend(project);
185                    }
186                    if !sub_project.is_empty() {
187                        params.sub_project_exprs.extend(sub_project)
188                    }
189                }
190                params.pre_workflow_type = WorkflowType::Transform;
191                Ok(params)
192            }
193            _ => Err(ParserError::DSLParserError(
194                "transform workflow must be transform".to_string(),
195            )),
196        }
197    }
198    pub fn execute_sort(
199        &self,
200        mut params: WorkflowParams,
201        sort: model::Workflow,
202    ) -> Result<WorkflowParams> {
203        match sort {
204            model::Workflow::Sort { by, sort } => {
205                let sort_exprs = get_order_epxr(sort, by, self.get_quote());
206                params.order_by = sort_exprs;
207                params.pre_workflow_type = WorkflowType::Sort;
208                Ok(params)
209            }
210            _ => Err(ParserError::DSLParserError(
211                "sort workflow must be sort".to_string(),
212            )),
213        }
214    }
215
216    pub fn execute_view(
217        &self,
218        mut params: WorkflowParams,
219        view: model::Workflow,
220    ) -> Result<WorkflowParams> {
221        match view {
222            model::Workflow::View { query } => {
223                for query in query {
224                    let read_onley_alias = params.alias.clone();
225                    let (project, group, append_alias) =
226                        self.get_projection_and_group_expr(&query, &read_onley_alias);
227                    params.project_exprs.extend(project);
228
229                    if !group.is_empty() {
230                        params.group_epxrs = group
231                    }
232                    for (k, v) in append_alias {
233                        params.alias.insert(k.to_string(), v.clone());
234                    }
235                    params.pre_workflow_type = WorkflowType::View;
236                }
237                Ok(params)
238            }
239            _ => Err(ParserError::DSLParserError(
240                "view workflow must be view".to_string(),
241            )),
242        }
243    }
244
245    pub fn parse_dataset_define(&self, dataset: &model::Dataset) -> Result<TableWithJoins> {
246        match dataset {
247            model::Dataset::Table { name } => Ok(TableWithJoins {
248                relation: TableFactor::Table {
249                    name: ObjectName(vec![sqlparser::ast::Ident {
250                        value: name.clone(),
251                        quote_style: self.get_quote(),
252                    }]),
253                    alias: None,
254                    args: None,
255                    with_hints: vec![],
256                },
257                joins: vec![],
258            }),
259            model::Dataset::View { sql } => {
260                let sub_query = SqlParser::parse_sql(&DuckDbDialect, sql).unwrap();
261                if sub_query.len() != 1 {
262                    return Err(ParserError::DataViewCreateError(
263                        "sub query must be one".to_string(),
264                    ));
265                }
266                self.parser_view_define(sub_query[0].clone())
267            }
268        }
269    }
270
271    // parser fid mapping
272    pub fn parse_dataview_define(
273        &self,
274        dataset: &model::Dataset,
275        view: Vec<DataView>,
276    ) -> Result<TableWithJoins> {
277        if view.is_empty() || view.len() > 1 {
278            return Err(ParserError::DataViewCreateError(
279                "view must be one".to_string(),
280            ));
281        }
282        match &view[0] {
283            DataView::Sql { query } => {
284                if query.is_empty() || query.len() > 1 {
285                    return Err(ParserError::DataViewCreateError(
286                        "query must be one".to_string(),
287                    ));
288                }
289                let query = query[0].clone();
290                let origin_ast = SqlParser::parse_sql(&DuckDbDialect, &query.sql).unwrap();
291                let check = matches!(&origin_ast[0], Statement::Query(_));
292                if !check {
293                    return Err(ParserError::DataViewCreateError(
294                        "query must be select".to_string(),
295                    ));
296                }
297                //select fid as name from table
298                let fromtable = self.parse_dataset_define(dataset)?;
299                let view_project = self.create_data_view_projection(query.fid_map.clone());
300                let rewrite_fid_view = TableWithJoins {
301                    relation: TableFactor::Derived {
302                        lateral: false,
303                        subquery: QueryBuilder::new()
304                            .with_projection(view_project)
305                            .with_from(vec![fromtable])
306                            .build_query(),
307                        alias: Some(sqlparser::ast::TableAlias {
308                            name: sqlparser::ast::Ident {
309                                value: "k_gw_write_view".to_owned(),
310                                quote_style: self.get_as_quote(),
311                            },
312                            columns: vec![],
313                        }),
314                    },
315                    joins: vec![],
316                };
317                // select xxx from default -> select xxx from (select fid as name from table)
318                let new_ast =
319                    self.replace_default_table(origin_ast[0].clone(), rewrite_fid_view)?;
320                self.parser_view_define(new_ast)
321            }
322        }
323    }
324
325    pub fn replace_default_table(
326        &self,
327        origin: Statement,
328        review_table: TableWithJoins,
329    ) -> Result<Statement> {
330        let new_ast: Statement = match origin {
331            Statement::Query(query) => {
332                let mut new_query = query.clone();
333                match *query.body {
334                    SetExpr::Select(select) => {
335                        if select.from.len() > 1 {
336                            return Err(ParserError::DataViewCreateError(
337                                "body ast must select one".to_string(),
338                            ));
339                        }
340                        new_query.body = Box::new(SetExpr::Select(Box::new(Select {
341                            from: vec![self.rewrite_data_view_table_with_join(
342                                select.from[0].clone(),
343                                review_table,
344                            )?],
345                            ..*select
346                        })));
347                    }
348                    _ => {
349                        return Err(ParserError::DataViewCreateError(
350                            "body ast must select".to_string(),
351                        ))
352                    }
353                };
354                Statement::Query(new_query)
355            }
356            _ => {
357                return Err(ParserError::DataViewCreateError(
358                    "body ast must select".to_string(),
359                ))
360            }
361        };
362
363        Ok(new_ast)
364    }
365
366    pub fn rewrite_data_view_table_with_join(
367        &self,
368        origin_table: TableWithJoins,
369        review_table: TableWithJoins,
370    ) -> Result<TableWithJoins> {
371        match origin_table.relation {
372            TableFactor::Table {
373                name: _,
374                alias: _,
375                args: _,
376                with_hints: _,
377            } => Ok(review_table),
378            TableFactor::Derived {
379                lateral: _,
380                subquery,
381                alias: _,
382            } => {
383                let new_sub_query = match *subquery.body {
384                    SetExpr::Select(mut select) => {
385                        if select.from.len() > 1 {
386                            return Err(ParserError::DSLSerializeError(
387                                "body ast must select one".to_string(),
388                            ));
389                        }
390                        select.from[0] = self.rewrite_data_view_table_with_join(
391                            select.from[0].clone(),
392                            review_table,
393                        )?;
394                        Query {
395                            body: Box::new(SetExpr::Select(select)),
396                            ..*subquery
397                        }
398                    }
399                    _ => {
400                        return Err(ParserError::DataViewCreateError(
401                            "body ast must select".to_string(),
402                        ));
403                    }
404                };
405                Ok(TableWithJoins {
406                    relation: TableFactor::Derived {
407                        lateral: false,
408                        subquery: Box::new(new_sub_query),
409                        alias: Some(sqlparser::ast::TableAlias {
410                            name: sqlparser::ast::Ident {
411                                value: "k_gw_review_default".to_owned(),
412                                quote_style: self.get_as_quote(),
413                            },
414                            columns: vec![],
415                        }),
416                    },
417                    joins: vec![],
418                })
419            }
420            _ => Err(ParserError::DataViewCreateError("not support".to_string())),
421        }
422    }
423
424    pub fn create_data_view_projection(&self, fid_map: HashMap<String, String>) -> Projection {
425        let mut project_exprs = vec![];
426        let mut vec: Vec<_> = fid_map.iter().collect();
427        vec.sort_by(|&(a, _), &(b, _)| a.cmp(b));
428        for item in vec {
429            let expr = Expr::Identifier(Ident {
430                value: item.1.to_string(),
431                quote_style: self.get_quote(),
432            });
433            let alias = Ident {
434                value: item.0.to_string(),
435                quote_style: self.get_as_quote(),
436            };
437            project_exprs.push(SelectItem::ExprWithAlias { expr, alias });
438        }
439        project_exprs
440    }
441
442    pub fn parse_workflow(&self, table: TableWithJoins, dsl: DSL) -> Result<Statement> {
443        // init workflow params
444        let mut params: WorkflowParams = WorkflowParams {
445            alias: HashMap::new(),
446            group_epxrs: vec![],
447            project_exprs: vec![],
448            sub_project_exprs: vec![],
449            slecction_exprs: vec![],
450            pre_workflow_type: WorkflowType::None,
451            order_by: vec![],
452            table,
453        };
454
455        // parse workflow
456        for ele in dsl.workflow {
457            match ele {
458                model::Workflow::Filter { filters: _ } => {
459                    params = self.execute_filter(params, ele)?;
460                }
461
462                model::Workflow::Transform { transform: _ } => {
463                    params = self.execute_transform(params, ele)?;
464                }
465
466                model::Workflow::View { query: _ } => {
467                    params = self.execute_view(params, ele)?;
468                }
469
470                model::Workflow::Sort { sort: _, by: _ } => {
471                    params = self.execute_sort(params, ele)?;
472                }
473            }
474        }
475
476        // set limit and offset
477        let (limit, offset) = get_limit_offset(dsl.limit, dsl.offset);
478
479        // if pre workflow is view, then merge view
480        let mut slection: Option<Expr> = None;
481        if !params.slecction_exprs.is_empty() {
482            slection = merge_binary_op(params.slecction_exprs);
483        }
484        let from =
485            self.generate_table_with_joins(params.sub_project_exprs.clone(), params.table.clone())?;
486        let new_stetement = Statement::Query(
487            QueryBuilder::new()
488                .with_projection(params.project_exprs)
489                .with_from(from)
490                .with_selection(slection)
491                .with_group_by(params.group_epxrs)
492                .with_order_by(params.order_by.clone())
493                .with_limit(limit)
494                .with_offset(offset)
495                .build_query(),
496        );
497        Ok(new_stetement)
498    }
499
500    fn generate_table_with_joins(
501        &self,
502        mut sub_project_exprs: Vec<SelectItem>,
503        table: TableWithJoins,
504    ) -> result::Result<Vec<TableWithJoins>, ParserError> {
505        let from: Vec<TableWithJoins>;
506        //write table to view
507        if !sub_project_exprs.is_empty() {
508            match table.relation {
509                TableFactor::Table {
510                    name: _,
511                    alias: _,
512                    with_hints: _,
513                    args: _,
514                } => {
515                    sub_project_exprs.push(SelectItem::Wildcard(WildcardAdditionalOptions {
516                        opt_exclude: None,
517                        opt_except: None,
518                        opt_rename: None,
519                        opt_replace: None,
520                    }));
521                    let statement = sqlparser::ast::Statement::Query(
522                        QueryBuilder::new()
523                            .with_from(vec![table])
524                            .with_projection(sub_project_exprs)
525                            .build_query(),
526                    );
527                    from = vec![self.parser_view_define(statement)?];
528                }
529                TableFactor::Derived {
530                    lateral,
531                    subquery,
532                    alias,
533                } => {
534                    let mut sub_query = subquery;
535                    if let sqlparser::ast::SetExpr::Select(select) = &mut *sub_query.body {
536                        select.projection = sub_project_exprs;
537                    }
538                    from = vec![TableWithJoins {
539                        relation: TableFactor::Derived {
540                            lateral,
541                            subquery: sub_query,
542                            alias,
543                        },
544                        joins: vec![],
545                    }];
546                }
547                _ => todo!("not support"),
548            }
549        } else {
550            from = vec![table];
551        }
552        Ok(from)
553    }
554
555    pub fn get_projection_and_group_expr(
556        &self,
557        query: &model::Query,
558        alias: &Alias,
559    ) -> (Projection, GroupBy, Alias) {
560        let mut new_alias = alias.clone();
561        match query {
562            model::Query::Aggregate { group_by, measures } => {
563                let mut projection = Projection::default();
564                let mut group_by_set: Vec<Expr> = GroupBy::default();
565                for ele in group_by {
566                    group_by_set.push(Expr::Identifier(Ident {
567                        value: ele.clone(),
568                        quote_style: self.get_quote(),
569                    }));
570                    if let Some(e) = alias.get(ele) {
571                        projection.push(e.clone());
572                    } else {
573                        projection.push(SelectItem::UnnamedExpr(Expr::Identifier(Ident {
574                            value: ele.clone(),
575                            quote_style: self.get_quote(),
576                        })));
577                    }
578                }
579                for ele in measures {
580                    let mut name_epxr: SelectItem;
581                    if ele.field == "*" {
582                        name_epxr = SelectItem::Wildcard(WildcardAdditionalOptions {
583                            opt_exclude: None,
584                            opt_except: None,
585                            opt_rename: None,
586                            opt_replace: None,
587                        });
588                    } else if let Some(format) = &ele.format {
589                        name_epxr = SelectItem::ExprWithAlias {
590                            expr: get_format_time_expr(
591                                Expr::Identifier(Ident {
592                                    value: ele.field.clone(),
593                                    quote_style: self.get_quote(),
594                                }),
595                                format.clone(),
596                            ),
597                            alias: Ident {
598                                value: ele.as_field_key.clone(),
599                                quote_style: self.get_as_quote(),
600                            },
601                        };
602                    } else {
603                        name_epxr = SelectItem::ExprWithAlias {
604                            expr: Expr::Identifier(Ident {
605                                value: ele.field.clone(),
606                                quote_style: self.get_quote(),
607                            }),
608                            alias: Ident {
609                                value: ele.as_field_key.clone(),
610                                quote_style: self.get_as_quote(),
611                            },
612                        };
613                    }
614
615                    if !new_alias.contains_key(&ele.field) {
616                        new_alias.insert(ele.as_field_key.clone(), name_epxr.clone());
617                    } else {
618                        name_epxr = new_alias.get(&ele.field).unwrap().clone();
619                    }
620                    let expr = match ele.agg {
621                        model::Agg::DistinctCount => Expr::Function(Function {
622                            name: ObjectName(vec![Ident {
623                                value: "count".to_string(),
624                                quote_style: None,
625                            }]),
626                            args: vec![FunctionArg::Unnamed(get_expr_from_select(name_epxr))],
627                            over: None,
628                            distinct: true,
629                            special: false,
630                            order_by: vec![],
631                        }),
632                        _ => get_function_expr(
633                            get_agg_func(ele.agg.clone()),
634                            vec![FunctionArg::Unnamed(get_expr_from_select(name_epxr))],
635                        ),
636                    };
637                    projection.push(SelectItem::ExprWithAlias {
638                        expr,
639                        alias: Ident {
640                            value: ele.as_field_key.clone(),
641                            quote_style: self.get_as_quote(),
642                        },
643                    });
644                }
645                (projection, group_by_set, new_alias)
646            }
647            model::Query::Raw { fields } => {
648                let mut projection = Projection::default();
649                let group_by: Vec<Expr> = GroupBy::default();
650                for field in fields {
651                    if let Some(f) = alias.get(field) {
652                        projection.push(f.clone());
653                    } else if field == "*" {
654                        projection.push(SelectItem::Wildcard(WildcardAdditionalOptions {
655                            opt_exclude: None,
656                            opt_except: None,
657                            opt_rename: None,
658                            opt_replace: None,
659                        }));
660                    } else {
661                        projection.push(SelectItem::UnnamedExpr(Expr::Identifier(Ident {
662                            value: field.clone(),
663                            quote_style: self.get_quote(),
664                        })));
665                    }
666                }
667                (projection, group_by, new_alias)
668            }
669        }
670    }
671
672    pub fn get_transform_expr(
673        &self,
674        transform: model::Transform,
675        alias: &Alias,
676    ) -> Result<(Projection, Projection, Alias)> {
677        let mut sub_projection = Projection::default();
678        let mut append_alias = alias.clone();
679        match transform.expression.op {
680            model::TransformOp::One => {
681                let sleect_item =
682                    SelectItem::UnnamedExpr(Expr::Value(Value::Number("1".to_string(), false)));
683                append_alias.insert(transform.expression.as_field_key, sleect_item);
684                Ok((sub_projection, vec![], append_alias))
685            }
686            model::TransformOp::Bin => {
687                let num = transform.expression.num.unwrap_or(10);
688                //process the sub expression
689                let param = transform.expression.params.unwrap()[0].clone();
690                let value = match param.value {
691                    model::ValueParam::String(ref v) => v.clone(),
692                    model::ValueParam::Map(_) => {
693                        return Err(ParserError::DSLSerializeError(
694                            "bin param must be string".to_string(),
695                        ))
696                    }
697                };
698                let min_expr = SelectItem::ExprWithAlias {
699                    expr: get_window_function_expr(
700                        "min".to_string(),
701                        vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
702                            Expr::Identifier(Ident {
703                                value: value.clone(),
704                                quote_style: self.get_quote(),
705                            }),
706                        ))],
707                        WindowType::WindowSpec(WindowSpec {
708                            partition_by: vec![],
709                            order_by: vec![],
710                            window_frame: None,
711                        }),
712                    ),
713                    alias: Ident {
714                        value: format!("{}{}", "min_", transform.expression.as_field_key),
715                        quote_style: self.get_as_quote(),
716                    },
717                };
718                let max_expr = SelectItem::ExprWithAlias {
719                    expr: get_window_function_expr(
720                        "max".to_string(),
721                        vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
722                            Expr::Identifier(Ident {
723                                value: value.clone(),
724                                quote_style: self.get_quote(),
725                            }),
726                        ))],
727                        WindowType::WindowSpec(WindowSpec {
728                            partition_by: vec![],
729                            order_by: vec![],
730                            window_frame: None,
731                        }),
732                    ),
733                    alias: Ident {
734                        value: format!("{}{}", "max_", transform.expression.as_field_key),
735                        quote_style: self.get_as_quote(),
736                    },
737                };
738                sub_projection.push(min_expr);
739                sub_projection.push(max_expr);
740                // process the main expression
741                let min_epxr_indentifier = Expr::Identifier(Ident {
742                    value: format!("{}{}", "min_", transform.expression.as_field_key),
743                    quote_style: self.get_quote(),
744                });
745                let max_epxr_indentifier = Expr::Identifier(Ident {
746                    value: format!("{}{}", "max_", transform.expression.as_field_key),
747                    quote_style: self.get_quote(),
748                });
749                //("max_expr" - "min_expr")
750                let max_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
751                    max_epxr_indentifier,
752                    min_epxr_indentifier.clone(),
753                    BinaryOperator::Minus,
754                )));
755                // (("max_expr" - "min_epxr") / 10.0)
756                let div_10_epxr = Expr::Nested(Box::new(get_binary_op_epxr(
757                    max_minus_min,
758                    Expr::Value(Value::Number(num.to_string() + ".0", false)),
759                    BinaryOperator::Divide,
760                )));
761                // (col - "min_gw_Kr7j")
762                let col_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
763                    Expr::Identifier(Ident {
764                        value,
765                        quote_style: self.get_quote(),
766                    }),
767                    min_epxr_indentifier.clone(),
768                    BinaryOperator::Minus,
769                )));
770                // (col_1 - "min_gw_Kr7j") / (("max_gw_Kr7j" - "min_gw_Kr7j") / 10.0)
771                let col_div = Expr::Nested(Box::new(get_binary_op_epxr(
772                    col_minus_min,
773                    div_10_epxr.clone(),
774                    BinaryOperator::Divide,
775                )));
776                // floor((col_1 - "min_gw_Kr7j") / (("max_gw_Kr7j" - "min_gw_Kr7j") / 10.0)), 9)
777                let floor_expr = get_function_expr(
778                    "floor".to_string(),
779                    vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(col_div))],
780                );
781                //least(floor((col_1 - "min_gw_Kr7j") / (("max_gw_Kr7j" - "min_gw_Kr7j") / 10.0)), 9)
782                let least_expr = get_function_expr(
783                    "least".to_string(),
784                    vec![
785                        FunctionArg::Unnamed(FunctionArgExpr::Expr(floor_expr)),
786                        FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(Value::Number(
787                            (num - 1).to_string(),
788                            false,
789                        )))),
790                    ],
791                );
792                //(least(floor((col_1 - "min_gw_Kr7j") / (("max_gw_Kr7j" - "min_gw_Kr7j") / 10.0)), 9) *
793                //(("max_gw_Kr7j" - "min_gw_Kr7j") / 10.0))
794                let final_expr = Expr::Nested(Box::new(get_binary_op_epxr(
795                    least_expr,
796                    div_10_epxr,
797                    BinaryOperator::Multiply,
798                )));
799                //"min_gw_Kr7j" + (
800                //least(floor((col_1 - "min_gw_Kr7j") / (("max_gw_Kr7j" - "min_gw_Kr7j") / 10.0)), 9) *
801                //(("max_gw_Kr7j" - "min_gw_Kr7j") / 10.0)) AS "gw_Kr7j",
802                let final_expr_with_alias = SelectItem::ExprWithAlias {
803                    expr: Expr::Nested(Box::new(get_binary_op_epxr(
804                        min_epxr_indentifier,
805                        final_expr,
806                        BinaryOperator::Plus,
807                    ))),
808                    alias: Ident {
809                        value: transform.expression.as_field_key.clone(),
810                        quote_style: self.get_as_quote(),
811                    },
812                };
813                append_alias.insert(transform.expression.as_field_key, final_expr_with_alias);
814                Ok((sub_projection, vec![], append_alias))
815            }
816            model::TransformOp::Log2 => {
817                let param = transform.expression.params.unwrap()[0].clone();
818                let value = match param.value {
819                    model::ValueParam::String(ref v) => v.clone(),
820                    _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
821                };
822                let log_2_epxr = get_log_n_func_expr(2, value, self.get_quote());
823                let log_2_expr_with_alias = SelectItem::ExprWithAlias {
824                    expr: log_2_epxr,
825                    alias: Ident {
826                        value: transform.expression.as_field_key.clone(),
827                        quote_style: self.get_as_quote(),
828                    },
829                };
830                append_alias.insert(transform.expression.as_field_key, log_2_expr_with_alias);
831                Ok((sub_projection, vec![], append_alias))
832            }
833            model::TransformOp::Log10 => {
834                let param = transform.expression.params.unwrap()[0].clone();
835                let value = match param.value {
836                    model::ValueParam::String(ref v) => v.clone(),
837                    _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
838                };
839                let log_10_epxr = get_log_n_func_expr(10, value, self.get_quote());
840                let log_10_expr_with_alias = SelectItem::ExprWithAlias {
841                    expr: log_10_epxr,
842                    alias: Ident {
843                        value: transform.expression.as_field_key.clone(),
844                        quote_style: self.get_as_quote(),
845                    },
846                };
847                append_alias.insert(transform.expression.as_field_key, log_10_expr_with_alias);
848                Ok((sub_projection, vec![], append_alias))
849            }
850            model::TransformOp::Log => {
851                let param = transform.expression.params.unwrap()[0].clone();
852                let value = match param.value {
853                    model::ValueParam::String(ref v) => v.clone(),
854                    _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
855                };
856                let log_epxr = get_log_n_func_expr(
857                    transform.expression.num.unwrap_or(10),
858                    value,
859                    self.get_quote(),
860                );
861                let log_expr_with_alias = SelectItem::ExprWithAlias {
862                    expr: log_epxr,
863                    alias: Ident {
864                        value: transform.expression.as_field_key.clone(),
865                        quote_style: self.get_as_quote(),
866                    },
867                };
868                append_alias.insert(transform.expression.as_field_key, log_expr_with_alias);
869                Ok((sub_projection, vec![], append_alias))
870            }
871            model::TransformOp::BinCount => {
872                //process the sub expression
873                let num = transform.expression.num.unwrap_or(10);
874                let param = transform.expression.params.unwrap()[0].clone();
875                let value = match param.value {
876                    model::ValueParam::String(ref v) => v.clone(),
877                    _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
878                };
879                let min_expr = SelectItem::ExprWithAlias {
880                    expr: get_window_function_expr(
881                        "min".to_string(),
882                        vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
883                            Expr::Identifier(Ident {
884                                value: value.clone(),
885                                quote_style: self.get_quote(),
886                            }),
887                        ))],
888                        WindowType::WindowSpec(WindowSpec {
889                            partition_by: vec![],
890                            order_by: vec![],
891                            window_frame: None,
892                        }),
893                    ),
894                    alias: Ident {
895                        value: format!("{}{}", "min_", transform.expression.as_field_key),
896                        quote_style: self.get_as_quote(),
897                    },
898                };
899                let max_expr = SelectItem::ExprWithAlias {
900                    expr: get_window_function_expr(
901                        "max".to_string(),
902                        vec![FunctionArg::Unnamed(FunctionArgExpr::Expr(
903                            Expr::Identifier(Ident {
904                                value: value.clone(),
905                                quote_style: self.get_quote(),
906                            }),
907                        ))],
908                        WindowType::WindowSpec(WindowSpec {
909                            partition_by: vec![],
910                            order_by: vec![],
911                            window_frame: None,
912                        }),
913                    ),
914                    alias: Ident {
915                        value: format!("{}{}", "max_", transform.expression.as_field_key),
916                        quote_style: self.get_as_quote(),
917                    },
918                };
919                sub_projection.push(min_expr);
920                sub_projection.push(max_expr);
921                // process the main expression
922                let min_epxr_indentifier = Expr::Identifier(Ident {
923                    value: format!("{}{}", "min_", transform.expression.as_field_key),
924                    quote_style: self.get_quote(),
925                });
926                let max_epxr_indentifier = Expr::Identifier(Ident {
927                    value: format!("{}{}", "max_", transform.expression.as_field_key),
928                    quote_style: self.get_quote(),
929                });
930                //("max_expr" - "min_expr")
931                let max_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
932                    max_epxr_indentifier,
933                    min_epxr_indentifier.clone(),
934                    BinaryOperator::Minus,
935                )));
936                // (("max_expr" - "min_epxr") / 10)
937                let div_10_epxr = Expr::Nested(Box::new(get_binary_op_epxr(
938                    max_minus_min,
939                    Expr::Value(Value::Number(num.to_string(), false)),
940                    BinaryOperator::Divide,
941                )));
942                // (col - "min_gw_Kr7j")
943                let col_minus_min = Expr::Nested(Box::new(get_binary_op_epxr(
944                    Expr::Identifier(Ident {
945                        value,
946                        quote_style: self.get_quote(),
947                    }),
948                    min_epxr_indentifier,
949                    BinaryOperator::Minus,
950                )));
951                // (col_1 - "min_gw_Kr7j") / (("max_gw_Kr7j" - "min_gw_Kr7j") / 10.0)
952                let col_div = Expr::Nested(Box::new(get_binary_op_epxr(
953                    col_minus_min,
954                    div_10_epxr,
955                    BinaryOperator::Divide,
956                )));
957                //least((col_3 - "min_gw_ZM8H") / (("max_gw_ZM8H" - "min_gw_ZM8H") / 10), 9)
958                let lest_expr = get_function_expr(
959                    "least".to_string(),
960                    vec![
961                        FunctionArg::Unnamed(FunctionArgExpr::Expr(col_div)),
962                        FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(Value::Number(
963                            (num - 1).to_string(),
964                            false,
965                        )))),
966                    ],
967                );
968                //least((col_3 - "min_gw_ZM8H") / (("max_gw_ZM8H" - "min_gw_ZM8H") / 10), 9) + 1
969                let bin_count_expr_with_alias = SelectItem::ExprWithAlias {
970                    expr: get_binary_op_epxr(
971                        lest_expr,
972                        Expr::Value(Value::Number("1".to_string(), false)),
973                        BinaryOperator::Plus,
974                    ),
975                    alias: Ident {
976                        value: transform.expression.as_field_key.clone(),
977                        quote_style: self.get_as_quote(),
978                    },
979                };
980                append_alias.insert(transform.expression.as_field_key, bin_count_expr_with_alias);
981                Ok((sub_projection, vec![], append_alias))
982            }
983            model::TransformOp::DateTimeDrill => {
984                let mut field: String = "".to_string();
985                let mut value: String = "".to_string();
986                let mut format = "%Y-%m-%d %H:%M:%S".to_string();
987                for ele in transform.expression.params.unwrap() {
988                    let v = match ele.value {
989                        model::ValueParam::String(ref v) => v.clone(),
990                        _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
991                    };
992                    if ele.r#type == "field" {
993                        field = v.clone();
994                    }
995                    if ele.r#type == "value" {
996                        value = v.clone();
997                    }
998                    if ele.r#type == "format" {
999                        format = v.clone();
1000                    }
1001                }
1002                if field.is_empty() || value.is_empty() {
1003                    return Err(ParserError::DSLSerializeError(
1004                        "field & value error".to_string(),
1005                    ));
1006                }
1007                let time_drill_expr = SelectItem::ExprWithAlias {
1008                    expr: get_strftime_expr(
1009                        get_data_trunc_expr(
1010                            get_format_time_expr(
1011                                Expr::Identifier(Ident {
1012                                    value: field,
1013                                    quote_style: self.get_quote(),
1014                                }),
1015                                format,
1016                            ),
1017                            value,
1018                        ),
1019                        "%Y-%m-%d %H:%M:%S".to_string(),
1020                    ),
1021                    alias: Ident {
1022                        value: transform.expression.as_field_key.clone(),
1023                        quote_style: self.get_as_quote(),
1024                    },
1025                };
1026                append_alias.insert(transform.expression.as_field_key, time_drill_expr);
1027                Ok((sub_projection, vec![], append_alias))
1028            }
1029            model::TransformOp::DateTimeFeature => {
1030                let mut field: String = "".to_string();
1031                let mut value: String = "".to_string();
1032                let mut format = "%Y-%m-%d %H:%M:%S".to_string();
1033                for ele in transform.expression.params.unwrap() {
1034                    let v = match ele.value {
1035                        model::ValueParam::String(ref v) => v.clone(),
1036                        _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
1037                    };
1038
1039                    if ele.r#type == "field" {
1040                        field = v.clone();
1041                    }
1042                    if ele.r#type == "value" {
1043                        value = v.clone();
1044                    }
1045                    if ele.r#type == "format" {
1046                        format = v.clone();
1047                    }
1048                }
1049                if field.is_empty() || value.is_empty() {
1050                    return Err(ParserError::DSLSerializeError(
1051                        "field & value error".to_string(),
1052                    ));
1053                }
1054                let strftime_param = get_data_feature_format(&value);
1055                let time_drill_expr = SelectItem::ExprWithAlias {
1056                    expr: get_data_part_expr(
1057                        get_format_time_expr(
1058                            Expr::Identifier(Ident {
1059                                value: field,
1060                                quote_style: self.get_quote(),
1061                            }),
1062                            format,
1063                        ),
1064                        strftime_param,
1065                    ),
1066                    alias: Ident {
1067                        value: transform.expression.as_field_key.clone(),
1068                        quote_style: self.get_as_quote(),
1069                    },
1070                };
1071                append_alias.insert(transform.expression.as_field_key, time_drill_expr);
1072                Ok((sub_projection, vec![], append_alias))
1073            }
1074            model::TransformOp::Expr => {
1075                let mut sql: String = "".to_string();
1076                for ele in transform.expression.params.unwrap() {
1077                    if ele.r#type == "sql" {
1078                        let value = match ele.value {
1079                            model::ValueParam::String(ref v) => v.clone(),
1080                            _ => {
1081                                return Err(ParserError::DSLSerializeError(
1082                                    "value error".to_string(),
1083                                ))
1084                            }
1085                        };
1086                        sql = value.clone();
1087                    }
1088                }
1089                if sql.is_empty() {
1090                    return Err(ParserError::DSLSerializeError("sql empty".to_string()));
1091                }
1092                let expr = SelectItem::ExprWithAlias {
1093                    expr: Expr::Identifier(Ident {
1094                        value: sql,
1095                        quote_style: None,
1096                    }),
1097                    alias: Ident {
1098                        value: transform.expression.as_field_key.clone(),
1099                        quote_style: self.get_as_quote(),
1100                    },
1101                };
1102                append_alias.insert(transform.expression.as_field_key, expr);
1103                Ok((sub_projection, vec![], append_alias))
1104            }
1105            model::TransformOp::Paint => {
1106                let param = transform.expression.params.unwrap()[0].clone();
1107                //case when
1108                let value = match param.value {
1109                    model::ValueParam::Map(ref v) => v.clone(),
1110                    _ => return Err(ParserError::DSLSerializeError("value error".to_string())),
1111                };
1112
1113                let compressed = match general_purpose::STANDARD.decode(value.map.clone()) {
1114                    Ok(v) => v,
1115                    Err(_) => {
1116                        return Err(ParserError::DSLSerializeError("value error".to_string()))
1117                    }
1118                };
1119                let slice: &[u8] = compressed.as_slice();
1120                let mut decoder = flate2::bufread::DeflateDecoder::new(slice);
1121                let mut decompressed: Vec<u8> = vec![];
1122                _ = decoder.read_to_end(&mut decompressed);
1123
1124                let rects = find_matching_rects(&decompressed, value.mapwidth as usize);
1125
1126                let mut results: Vec<Expr> = vec![];
1127                let mut conditions = vec![];
1128                for ele in rects {
1129                    //condition x
1130                    let x1 = (value.domain_x[1] - value.domain_x[0]) / value.mapwidth as f64
1131                        * ele.x1 as f64
1132                        + value.domain_x[0];
1133                    let x2 = (value.domain_x[1] - value.domain_x[0]) / value.mapwidth as f64
1134                        * (ele.x2 + 1) as f64
1135                        + value.domain_x[0];
1136                    let left = Expr::Between {
1137                        expr: Box::new(Expr::Identifier(Ident {
1138                            value: value.x.clone(),
1139                            quote_style: self.get_quote(),
1140                        })),
1141                        negated: false,
1142                        low: Box::new(Expr::Value(Value::Number(x1.to_string(), false))),
1143                        high: Box::new(Expr::Value(Value::Number(x2.to_string(), false))),
1144                    };
1145                    //condition y
1146                    let y1 = (value.domain_y[1] - value.domain_y[0]) / value.mapwidth as f64
1147                        * ele.y1 as f64
1148                        + value.domain_y[0];
1149                    let y2 = (value.domain_y[1] - value.domain_y[0]) / value.mapwidth as f64
1150                        * (ele.y2 + 1) as f64
1151                        + value.domain_y[0];
1152                    let right = Expr::Between {
1153                        expr: Box::new(Expr::Identifier(Ident {
1154                            value: value.y.clone(),
1155                            quote_style: self.get_quote(),
1156                        })),
1157                        negated: false,
1158                        low: Box::new(Expr::Value(Value::Number(y1.to_string(), false))),
1159                        high: Box::new(Expr::Value(Value::Number(y2.to_string(), false))),
1160                    };
1161                    let condition = get_binary_op_epxr(left, right, BinaryOperator::And);
1162                    conditions.push(condition);
1163                    //result
1164                    let value_str = &ele.value.to_string();
1165
1166                    let name = match value.dict.get(value_str) {
1167                        Some(v) => v.clone(),
1168                        None => {
1169                            let error_msg = format!("value {} not found", value_str);
1170                            return Err(ParserError::DSLSerializeError(error_msg));
1171                        }
1172                    };
1173                    results.push(Expr::Value(Value::SingleQuotedString(name.name)))
1174                }
1175
1176                let case_expr = Expr::Case {
1177                    operand: None,
1178                    conditions,
1179                    results,
1180                    else_result: None,
1181                };
1182                let expr: SelectItem = SelectItem::ExprWithAlias {
1183                    expr: case_expr,
1184                    alias: Ident {
1185                        value: transform.expression.as_field_key.clone(),
1186                        quote_style: self.get_as_quote(),
1187                    },
1188                };
1189                append_alias.insert(transform.expression.as_field_key, expr);
1190                Ok((sub_projection, vec![], append_alias))
1191            }
1192        }
1193    }
1194}