datafusion_kql/
planner.rs

1use arrow_schema::{DataType, Field, TimeUnit};
2
3use datafusion_common::{TableReference, JoinType, Column, DFSchema, ScalarValue};
4use datafusion_common::{DataFusionError, Result};
5
6use datafusion_expr::{ExprSchemable, Values};
7use datafusion_expr::expr::{AggregateFunction, ScalarFunction, WindowFunction};
8use datafusion_expr::expr_fn::col;
9use datafusion_expr::planner::ContextProvider;
10use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
11use datafusion_expr::{Expr, Literal, SortExpr};
12
13use itertools::Itertools;
14
15use kqlparser::ast::{Expr as KqlExpr, Operator, TabularExpression, Literal as KqlLiteral, Source, Type};
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20#[derive(Default, Clone)]
21struct PlannerContext {}
22
23pub struct KqlToRel<'a, S: ContextProvider> {
24    ctx: &'a S,
25}
26
27impl<'a, S: ContextProvider> KqlToRel<'a, S> {
28    pub fn new(ctx: &'a S) -> Self {
29        KqlToRel { ctx }
30    }
31
32    fn func_to_expr(&self, ctx: &mut PlannerContext, name: &str, args: &Vec<KqlExpr>) -> Result<Expr> {
33        let args = args.iter().map(|a| self.ast_to_expr(ctx, a)).collect::<Result<Vec<Expr>>>()?;
34        if let Some(f) = self.ctx.get_function_meta(&name) {
35            Ok(Expr::ScalarFunction(ScalarFunction::new_udf(f, args)))
36        } else if let Some(f) = self.ctx.get_aggregate_meta(&name) {
37            Ok(Expr::AggregateFunction(AggregateFunction::new_udf(f, args, false, None, None, None)))
38        } else if let Some(f) = self.ctx.get_window_meta(&name) {
39            Ok(Expr::WindowFunction(WindowFunction::new(f, args)))
40        } else {
41            return Err(DataFusionError::NotImplemented("Function not implemented".to_string()));
42        }
43    }
44
45    fn ast_to_expr(&self, ctx: &mut PlannerContext, ast: &KqlExpr) -> Result<Expr> {
46        Ok(match ast {
47            KqlExpr::Equals(x, y) => self.ast_to_expr(ctx, &x)?.eq(self.ast_to_expr(ctx, &y)?),
48            KqlExpr::NotEquals(x, y) => self.ast_to_expr(ctx, &x)?.not_eq(self.ast_to_expr(ctx, &y)?),
49            KqlExpr::And(x, y) => self.ast_to_expr(ctx, &x)?.and(self.ast_to_expr(ctx, &y)?),
50            KqlExpr::Or(x, y) => self.ast_to_expr(ctx, &x)?.or(self.ast_to_expr(ctx, &y)?),
51            KqlExpr::Add(x, y) => self.ast_to_expr(ctx, &x)? + self.ast_to_expr(ctx, &y)?,
52            KqlExpr::Substract(x, y) => self.ast_to_expr(ctx, &x)? - self.ast_to_expr(ctx, &y)?,
53            KqlExpr::Multiply(x, y) => self.ast_to_expr(ctx, &x)? * self.ast_to_expr(ctx, &y)?,
54            KqlExpr::Divide(x, y) => self.ast_to_expr(ctx, &x)? / self.ast_to_expr(ctx, &y)?,
55            KqlExpr::Modulo(x, y) => self.ast_to_expr(ctx, &x)? % self.ast_to_expr(ctx, &y)?,
56            KqlExpr::Less(x, y) => self.ast_to_expr(ctx, &x)?.lt(self.ast_to_expr(ctx, &y)?),
57            KqlExpr::Greater(x, y) => self.ast_to_expr(ctx, &x)?.gt(self.ast_to_expr(ctx, &y)?),
58            KqlExpr::LessOrEqual(x, y) => self.ast_to_expr(ctx, &x)?.lt_eq(self.ast_to_expr(ctx, &y)?),
59            KqlExpr::GreaterOrEqual(x, y) => self.ast_to_expr(ctx, &x)?.gt_eq(self.ast_to_expr(ctx, &y)?),
60            KqlExpr::Literal(v) => literal_to_expr(v),
61            KqlExpr::Ident(x) => col(x.as_str()),
62            KqlExpr::Func(x, y) => self.func_to_expr(ctx, x.as_str(), y)?,
63            _ => return Err(DataFusionError::NotImplemented("Expr not implemented".to_string()))
64        })
65    }
66
67    fn query_statement_to_plan(&self, ctx: &mut PlannerContext, query: &TabularExpression) -> Result<LogicalPlan> {
68        let mut builder = match &query.source {
69            Source::Print(v) => {
70                let values = v.iter()
71                    .map(|(_, v)| self.ast_to_expr(ctx, v))
72                    .collect::<Result<Vec<Expr>>>()?;
73
74                let mut print_idx = 0;
75                let schema = DFSchema::empty();
76                let fields = values.iter()
77                    .zip(v)
78                    .map(|(v, (n, _))| {
79                        let name = n.clone().unwrap_or_else(|| {
80                            let name = format!("print_{}", print_idx);
81                            print_idx += 1;
82                            name
83                        });
84                        Ok((None, Arc::new(Field::new(name, v.get_type(&schema)?, true))))
85                    })
86                    .collect::<Result<Vec<(Option<TableReference>, Arc<Field>)>>>()?;
87
88                LogicalPlanBuilder::from(LogicalPlan::Values(Values {
89                    schema: Arc::new(DFSchema::new_with_metadata(fields, HashMap::default())?),
90                    values: vec![values]
91                }))
92            }
93            Source::Datatable(s, d) => LogicalPlanBuilder::from(LogicalPlan::Values(Values {
94                schema: Arc::new(DFSchema::new_with_metadata(s.iter().map(|(n, t)| (None::<TableReference>, Arc::new(Field::new(n, type_to_datatype(t), true)))).collect(), HashMap::default()).unwrap()),
95                values: d.iter().chunks(s.len()).into_iter().map(|chunk| chunk.map(|r| self.ast_to_expr(ctx, r).unwrap()).collect()).collect()
96            })),
97            Source::Reference(n) => LogicalPlanBuilder::scan(n.clone(), self.ctx.get_table_source(TableReference::from(n.as_str()))?, None)?,
98            _ => return Err(DataFusionError::NotImplemented("Source not implemented".to_string())),
99        };
100
101        for op in query.operators.iter() {
102            builder = match op {
103                Operator::MvExpand(x) => builder.unnest_column(Column::from(x))?,
104                Operator::Extend(x) => {
105                    let current_schema = builder.schema().clone();
106                    let current_columns = current_schema.columns().into_iter().map(|f| Expr::Column(f));
107                    builder.project(current_columns.chain(x.iter().map(|(a, b)| {
108                        let mut expr = self.ast_to_expr(ctx, b).unwrap();
109                        if let Some(alias) = a {
110                            expr = expr.alias(alias);
111                        }
112                        expr
113                    })))?
114                },
115                Operator::Join(_, x, y) => {
116                    let keys: Vec<&str> = y.iter().map(|s| s.as_ref()).collect();
117                    builder.join(self.query_statement_to_plan(ctx, &x)?, JoinType::Inner, (keys.clone(), keys), Option::None)?
118                },
119                Operator::Project(x) => builder.project(x.iter().map(|(a, b)| {
120                    let mut expr = self.ast_to_expr(ctx, b).unwrap();
121                    if let Some(alias) = a {
122                        expr = expr.alias(alias);
123                    }
124                    expr
125                }))?,
126                Operator::Where(x) => builder.filter(self.ast_to_expr(ctx, &x)?)?,
127                Operator::Serialize(x) => builder.window(x.iter().map(|(a, b)| {
128                    let mut expr = self.ast_to_expr(ctx, b).unwrap();
129                    if let Some(alias) = a {
130                        expr = expr.alias(alias);
131                    }
132                    expr
133                }))?,
134                Operator::Summarize(x, y) => {
135                    let mut ctx1 = ctx.clone();
136                    builder.aggregate(y.iter().map(|z| self.ast_to_expr(&mut ctx1, z).unwrap()), x.iter().map(|(_, z)| self.ast_to_expr(ctx, z).unwrap()))?
137                },
138                Operator::Sort(o) => builder.sort(o.iter().map(|c| SortExpr::new(col(c), false, false)))?,
139                Operator::Take(x) => builder.limit(0, Some((*x).try_into().unwrap()))?,
140                _ => return Err(DataFusionError::NotImplemented("Operator not implemented".to_string())),
141            };
142        }
143
144        builder.build()
145    }
146
147    pub fn query_to_plan(&self, query: &TabularExpression) -> Result<LogicalPlan> {
148        self.query_statement_to_plan(&mut PlannerContext::default(), query)
149    }
150}
151
152fn type_to_datatype(t: &Type) -> DataType {
153    match t {
154        Type::Bool => DataType::Boolean,
155        Type::Decimal => DataType::Float64,
156        Type::Int => DataType::Int32,
157        Type::Long => DataType::Int64,
158        Type::Real => DataType::Float32,
159        Type::String => DataType::Utf8,
160        Type::Timespan => DataType::Duration(TimeUnit::Nanosecond),
161        _ => panic!("Not supported")
162    }
163}
164
165fn literal_to_expr(val: &KqlLiteral) -> Expr {
166    match val {
167        KqlLiteral::Bool(x) => ScalarValue::from(*x).lit(),
168        KqlLiteral::Decimal(x) => ScalarValue::from(*x).lit(),
169        KqlLiteral::Int(x) => ScalarValue::from(*x).lit(),
170        KqlLiteral::Long(x) => ScalarValue::from(*x).lit(),
171        KqlLiteral::Real(x) => ScalarValue::from(*x).lit(),
172        KqlLiteral::String(x) => ScalarValue::from(x.clone()).lit(),
173        KqlLiteral::Timespan(x) => ScalarValue::DurationNanosecond(*x).lit(),
174        _ => panic!("Not supported")
175    }
176}