mod expr_label;
pub use expr_label::{AggrExprLabel, ExprLabel, ExprLabelGenerator, ValueExprLabel};
use std::collections::HashMap;
use anyhow::anyhow;
use crate::{
api::error::{Result, SpringError},
expression::{AggrExpr, ValueExpr},
pipeline::{AggrAlias, ValueAlias},
sql_processor::SelectFieldSyntax,
stream_engine::{SqlValue, Tuple},
};
#[derive(Clone, PartialEq, Debug)]
pub struct ExprResolver {
label_gen: ExprLabelGenerator,
value_expressions: HashMap<ValueExprLabel, ValueExpr>,
value_aliased_labels: HashMap<ValueAlias, ValueExprLabel>,
aggr_expressions: HashMap<AggrExprLabel, AggrExpr>,
aggr_aliased_labels: HashMap<AggrAlias, AggrExprLabel>,
aggr_expression_results: HashMap<AggrExprLabel, SqlValue>,
}
impl ExprResolver {
pub fn new(select_list: Vec<SelectFieldSyntax>) -> (Self, Vec<ExprLabel>) {
let mut label_gen = ExprLabelGenerator::default();
let mut value_expressions = HashMap::new();
let mut value_aliased_labels = HashMap::new();
let mut aggr_expressions = HashMap::new();
let mut aggr_aliased_labels = HashMap::new();
let expr_labels = select_list
.into_iter()
.map(|select_field| match select_field {
SelectFieldSyntax::ValueExpr { value_expr, alias } => {
let label = label_gen.next_value();
value_expressions.insert(label, value_expr);
if let Some(alias) = alias {
value_aliased_labels.insert(alias, label);
}
ExprLabel::Value(label)
}
SelectFieldSyntax::AggrExpr { aggr_expr, alias } => {
let label = label_gen.next_aggr();
aggr_expressions.insert(label, aggr_expr);
if let Some(alias) = alias {
aggr_aliased_labels.insert(alias, label);
}
ExprLabel::Aggr(label)
}
})
.collect();
(
Self {
label_gen,
value_expressions,
value_aliased_labels,
aggr_expressions,
aggr_aliased_labels,
aggr_expression_results: HashMap::new(),
},
expr_labels,
)
}
pub fn resolve_value_alias(&self, value_alias: ValueAlias) -> Result<ValueExprLabel> {
self.value_aliased_labels
.get(&value_alias)
.cloned()
.ok_or_else(|| {
SpringError::Sql(anyhow!(
"Value alias `{}` is not in select list.",
value_alias
))
})
}
#[allow(dead_code)]
pub fn resolve_aggr_alias(&self, aggr_alias: AggrAlias) -> Result<AggrExprLabel> {
self.aggr_aliased_labels
.get(&aggr_alias)
.cloned()
.ok_or_else(|| {
SpringError::Sql(anyhow!(
"Aggr alias `{}` is not in select list.",
aggr_alias
))
})
}
pub fn resolve_aggr_expr(&self, label: AggrExprLabel) -> AggrExpr {
self.aggr_expressions
.get(&label)
.cloned()
.unwrap_or_else(|| panic!("label {:?} not found", label))
}
pub fn register_value_expr(&mut self, value_expr: ValueExpr) -> ValueExprLabel {
let label = self.label_gen.next_value();
self.value_expressions.insert(label, value_expr);
label
}
#[allow(dead_code)]
pub fn register_aggr_expr(&mut self, aggr_expr: AggrExpr) -> AggrExprLabel {
let label = self.label_gen.next_aggr();
self.aggr_expressions.insert(label, aggr_expr);
label
}
pub fn eval_value_expr(&self, label: ValueExprLabel, tuple: &Tuple) -> Result<SqlValue> {
let value_expr = self
.value_expressions
.get(&label)
.cloned()
.unwrap_or_else(|| panic!("label {:?} not found", label));
let value_expr_ph2 = value_expr.resolve_colref(tuple)?;
value_expr_ph2.eval()
}
pub fn eval_aggr_expr_inner(&self, label: AggrExprLabel, tuple: &Tuple) -> Result<SqlValue> {
let aggr_expr = self.resolve_aggr_expr(label);
let value_expr = aggr_expr.aggregated;
let value_expr_ph2 = value_expr.resolve_colref(tuple)?;
value_expr_ph2.eval()
}
}
#[cfg(test)]
mod tests {
use crate::{
expression::ValueExpr,
stream_engine::{time::SpringTimestamp, RowTime},
};
use super::*;
#[test]
fn test_expr_resolver() {
let select_list = vec![
SelectFieldSyntax::ValueExpr {
value_expr: ValueExpr::factory_add(
ValueExpr::factory_integer(1),
ValueExpr::factory_integer(1),
),
alias: None,
},
SelectFieldSyntax::ValueExpr {
value_expr: ValueExpr::factory_add(
ValueExpr::factory_integer(2),
ValueExpr::factory_integer(2),
),
alias: Some(ValueAlias::new("a1".to_string())),
},
];
let (mut resolver, labels_select_list) = ExprResolver::new(select_list);
if let &[ExprLabel::Value(value_label0), ExprLabel::Value(value_label1)] =
&labels_select_list[..]
{
assert_eq!(
resolver
.resolve_value_alias(ValueAlias::new("a1".to_string()))
.unwrap(),
value_label1
);
assert!(resolver
.resolve_value_alias(ValueAlias::new("a404".to_string()))
.is_err(),);
let label = resolver.register_value_expr(ValueExpr::factory_add(
ValueExpr::factory_integer(3),
ValueExpr::factory_integer(3),
));
let empty_tuple = Tuple::new(RowTime::EventTime(SpringTimestamp::fx_ts1()), vec![]);
assert_eq!(
resolver
.eval_value_expr(value_label0, &empty_tuple)
.unwrap(),
SqlValue::factory_integer(2)
);
assert_eq!(
resolver
.eval_value_expr(value_label0, &empty_tuple)
.unwrap(),
SqlValue::factory_integer(2),
"eval twice"
);
assert_eq!(
resolver
.eval_value_expr(value_label1, &empty_tuple)
.unwrap(),
SqlValue::factory_integer(4)
);
assert_eq!(
resolver.eval_value_expr(label, &empty_tuple).unwrap(),
SqlValue::factory_integer(6)
);
} else {
unreachable!()
}
}
}