1use arrow_schema::{DataType, Field, TimeUnit};
2
3use datafusion_common::{TableReference, JoinType, Column, DFSchema, ScalarValue};
4use datafusion_common::{DataFusionError, Result};
5
6use datafusion_expr::{ExprSchemable, Values};
7use datafusion_expr::expr::{AggregateFunction, ScalarFunction, WindowFunction};
8use datafusion_expr::expr_fn::col;
9use datafusion_expr::planner::ContextProvider;
10use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
11use datafusion_expr::{Expr, Literal, SortExpr};
12
13use itertools::Itertools;
14
15use kqlparser::ast::{Expr as KqlExpr, Operator, TabularExpression, Literal as KqlLiteral, Source, Type};
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20#[derive(Default, Clone)]
21struct PlannerContext {}
22
23pub struct KqlToRel<'a, S: ContextProvider> {
24 ctx: &'a S,
25}
26
27impl<'a, S: ContextProvider> KqlToRel<'a, S> {
28 pub fn new(ctx: &'a S) -> Self {
29 KqlToRel { ctx }
30 }
31
32 fn func_to_expr(&self, ctx: &mut PlannerContext, name: &str, args: &Vec<KqlExpr>) -> Result<Expr> {
33 let args = args.iter().map(|a| self.ast_to_expr(ctx, a)).collect::<Result<Vec<Expr>>>()?;
34 if let Some(f) = self.ctx.get_function_meta(&name) {
35 Ok(Expr::ScalarFunction(ScalarFunction::new_udf(f, args)))
36 } else if let Some(f) = self.ctx.get_aggregate_meta(&name) {
37 Ok(Expr::AggregateFunction(AggregateFunction::new_udf(f, args, false, None, None, None)))
38 } else if let Some(f) = self.ctx.get_window_meta(&name) {
39 Ok(Expr::WindowFunction(WindowFunction::new(f, args)))
40 } else {
41 return Err(DataFusionError::NotImplemented("Function not implemented".to_string()));
42 }
43 }
44
45 fn ast_to_expr(&self, ctx: &mut PlannerContext, ast: &KqlExpr) -> Result<Expr> {
46 Ok(match ast {
47 KqlExpr::Equals(x, y) => self.ast_to_expr(ctx, &x)?.eq(self.ast_to_expr(ctx, &y)?),
48 KqlExpr::NotEquals(x, y) => self.ast_to_expr(ctx, &x)?.not_eq(self.ast_to_expr(ctx, &y)?),
49 KqlExpr::And(x, y) => self.ast_to_expr(ctx, &x)?.and(self.ast_to_expr(ctx, &y)?),
50 KqlExpr::Or(x, y) => self.ast_to_expr(ctx, &x)?.or(self.ast_to_expr(ctx, &y)?),
51 KqlExpr::Add(x, y) => self.ast_to_expr(ctx, &x)? + self.ast_to_expr(ctx, &y)?,
52 KqlExpr::Substract(x, y) => self.ast_to_expr(ctx, &x)? - self.ast_to_expr(ctx, &y)?,
53 KqlExpr::Multiply(x, y) => self.ast_to_expr(ctx, &x)? * self.ast_to_expr(ctx, &y)?,
54 KqlExpr::Divide(x, y) => self.ast_to_expr(ctx, &x)? / self.ast_to_expr(ctx, &y)?,
55 KqlExpr::Modulo(x, y) => self.ast_to_expr(ctx, &x)? % self.ast_to_expr(ctx, &y)?,
56 KqlExpr::Less(x, y) => self.ast_to_expr(ctx, &x)?.lt(self.ast_to_expr(ctx, &y)?),
57 KqlExpr::Greater(x, y) => self.ast_to_expr(ctx, &x)?.gt(self.ast_to_expr(ctx, &y)?),
58 KqlExpr::LessOrEqual(x, y) => self.ast_to_expr(ctx, &x)?.lt_eq(self.ast_to_expr(ctx, &y)?),
59 KqlExpr::GreaterOrEqual(x, y) => self.ast_to_expr(ctx, &x)?.gt_eq(self.ast_to_expr(ctx, &y)?),
60 KqlExpr::Literal(v) => literal_to_expr(v),
61 KqlExpr::Ident(x) => col(x.as_str()),
62 KqlExpr::Func(x, y) => self.func_to_expr(ctx, x.as_str(), y)?,
63 _ => return Err(DataFusionError::NotImplemented("Expr not implemented".to_string()))
64 })
65 }
66
67 fn query_statement_to_plan(&self, ctx: &mut PlannerContext, query: &TabularExpression) -> Result<LogicalPlan> {
68 let mut builder = match &query.source {
69 Source::Print(v) => {
70 let values = v.iter()
71 .map(|(_, v)| self.ast_to_expr(ctx, v))
72 .collect::<Result<Vec<Expr>>>()?;
73
74 let mut print_idx = 0;
75 let schema = DFSchema::empty();
76 let fields = values.iter()
77 .zip(v)
78 .map(|(v, (n, _))| {
79 let name = n.clone().unwrap_or_else(|| {
80 let name = format!("print_{}", print_idx);
81 print_idx += 1;
82 name
83 });
84 Ok((None, Arc::new(Field::new(name, v.get_type(&schema)?, true))))
85 })
86 .collect::<Result<Vec<(Option<TableReference>, Arc<Field>)>>>()?;
87
88 LogicalPlanBuilder::from(LogicalPlan::Values(Values {
89 schema: Arc::new(DFSchema::new_with_metadata(fields, HashMap::default())?),
90 values: vec![values]
91 }))
92 }
93 Source::Datatable(s, d) => LogicalPlanBuilder::from(LogicalPlan::Values(Values {
94 schema: Arc::new(DFSchema::new_with_metadata(s.iter().map(|(n, t)| (None::<TableReference>, Arc::new(Field::new(n, type_to_datatype(t), true)))).collect(), HashMap::default()).unwrap()),
95 values: d.iter().chunks(s.len()).into_iter().map(|chunk| chunk.map(|r| self.ast_to_expr(ctx, r).unwrap()).collect()).collect()
96 })),
97 Source::Reference(n) => LogicalPlanBuilder::scan(n.clone(), self.ctx.get_table_source(TableReference::from(n.as_str()))?, None)?,
98 _ => return Err(DataFusionError::NotImplemented("Source not implemented".to_string())),
99 };
100
101 for op in query.operators.iter() {
102 builder = match op {
103 Operator::MvExpand(x) => builder.unnest_column(Column::from(x))?,
104 Operator::Extend(x) => {
105 let current_schema = builder.schema().clone();
106 let current_columns = current_schema.columns().into_iter().map(|f| Expr::Column(f));
107 builder.project(current_columns.chain(x.iter().map(|(a, b)| {
108 let mut expr = self.ast_to_expr(ctx, b).unwrap();
109 if let Some(alias) = a {
110 expr = expr.alias(alias);
111 }
112 expr
113 })))?
114 },
115 Operator::Join(_, x, y) => {
116 let keys: Vec<&str> = y.iter().map(|s| s.as_ref()).collect();
117 builder.join(self.query_statement_to_plan(ctx, &x)?, JoinType::Inner, (keys.clone(), keys), Option::None)?
118 },
119 Operator::Project(x) => builder.project(x.iter().map(|(a, b)| {
120 let mut expr = self.ast_to_expr(ctx, b).unwrap();
121 if let Some(alias) = a {
122 expr = expr.alias(alias);
123 }
124 expr
125 }))?,
126 Operator::Where(x) => builder.filter(self.ast_to_expr(ctx, &x)?)?,
127 Operator::Serialize(x) => builder.window(x.iter().map(|(a, b)| {
128 let mut expr = self.ast_to_expr(ctx, b).unwrap();
129 if let Some(alias) = a {
130 expr = expr.alias(alias);
131 }
132 expr
133 }))?,
134 Operator::Summarize(x, y) => {
135 let mut ctx1 = ctx.clone();
136 builder.aggregate(y.iter().map(|z| self.ast_to_expr(&mut ctx1, z).unwrap()), x.iter().map(|(_, z)| self.ast_to_expr(ctx, z).unwrap()))?
137 },
138 Operator::Sort(o) => builder.sort(o.iter().map(|c| SortExpr::new(col(c), false, false)))?,
139 Operator::Take(x) => builder.limit(0, Some((*x).try_into().unwrap()))?,
140 _ => return Err(DataFusionError::NotImplemented("Operator not implemented".to_string())),
141 };
142 }
143
144 builder.build()
145 }
146
147 pub fn query_to_plan(&self, query: &TabularExpression) -> Result<LogicalPlan> {
148 self.query_statement_to_plan(&mut PlannerContext::default(), query)
149 }
150}
151
152fn type_to_datatype(t: &Type) -> DataType {
153 match t {
154 Type::Bool => DataType::Boolean,
155 Type::Decimal => DataType::Float64,
156 Type::Int => DataType::Int32,
157 Type::Long => DataType::Int64,
158 Type::Real => DataType::Float32,
159 Type::String => DataType::Utf8,
160 Type::Timespan => DataType::Duration(TimeUnit::Nanosecond),
161 _ => panic!("Not supported")
162 }
163}
164
165fn literal_to_expr(val: &KqlLiteral) -> Expr {
166 match val {
167 KqlLiteral::Bool(x) => ScalarValue::from(*x).lit(),
168 KqlLiteral::Decimal(x) => ScalarValue::from(*x).lit(),
169 KqlLiteral::Int(x) => ScalarValue::from(*x).lit(),
170 KqlLiteral::Long(x) => ScalarValue::from(*x).lit(),
171 KqlLiteral::Real(x) => ScalarValue::from(*x).lit(),
172 KqlLiteral::String(x) => ScalarValue::from(x.clone()).lit(),
173 KqlLiteral::Timespan(x) => ScalarValue::DurationNanosecond(*x).lit(),
174 _ => panic!("Not supported")
175 }
176}