use crate::core::Value;
use crate::functions::global_registry;
use crate::parser::ast::{self as ast, InfixOperator, PrefixOperator};
use crate::storage::expression::{
AndExpr, BetweenExpr, ComparisonExpr, ConstBoolExpr, Expression as StorageExpr, FunctionArg,
FunctionExpr, InListExpr, LikeExpr, NotExpr, NullCheckExpr, OrExpr,
};
use std::sync::Arc;
use super::{registry, PushdownContext, PushdownResult, PushdownRule};
use crate::executor::utils::{
extract_column_name, extract_literal_value, flip_operator, infix_to_operator,
};
fn extract_literal_with_ctx(expr: &ast::Expression, ctx: &PushdownContext<'_>) -> Option<Value> {
if let Some(value) = extract_literal_value(expr) {
return Some(value);
}
if let ast::Expression::Parameter(param) = expr {
if let Some(exec_ctx) = ctx.exec_ctx {
if param.name.starts_with(':') {
let name = ¶m.name[1..];
return exec_ctx.get_named_param(name).cloned();
} else {
let params = exec_ctx.params();
if param.index > 0 && param.index <= params.len() {
return Some(params[param.index - 1].clone());
}
}
}
}
if contains_non_deterministic_volatile(expr) {
return None;
}
match expr {
ast::Expression::Infix(_) | ast::Expression::Prefix(_) => {
crate::executor::expression::try_eval_constant_expr(expr)
}
ast::Expression::FunctionCall(fc) if fc.arguments.is_empty() => {
crate::executor::expression::try_eval_constant_expr(expr)
}
_ => None,
}
}
fn contains_non_deterministic_volatile(expr: &ast::Expression) -> bool {
match expr {
ast::Expression::FunctionCall(func) => {
let upper = func.function.to_uppercase();
let is_volatile = matches!(
upper.as_str(),
"RANDOM" | "RAND" | "UUID" | "SLEEP" | "EMBED"
);
is_volatile
|| func
.arguments
.iter()
.any(contains_non_deterministic_volatile)
}
ast::Expression::Infix(infix) => {
contains_non_deterministic_volatile(&infix.left)
|| contains_non_deterministic_volatile(&infix.right)
}
ast::Expression::Prefix(prefix) => contains_non_deterministic_volatile(&prefix.right),
ast::Expression::Cast(cast) => contains_non_deterministic_volatile(&cast.expr),
_ => false,
}
}
fn extract_comparison_parts(
left: &ast::Expression,
right: &ast::Expression,
ctx: &PushdownContext<'_>,
) -> Option<(String, Value, bool)> {
if let Some(column) = extract_column_name(left) {
if ctx.has_column(&column) {
if let Some(value) = extract_literal_with_ctx(right, ctx) {
let coerced = ctx.coerce_to_column_type(&column, value);
return Some((column, coerced, false)); }
}
}
if let Some(column) = extract_column_name(right) {
if ctx.has_column(&column) {
if let Some(value) = extract_literal_with_ctx(left, ctx) {
let coerced = ctx.coerce_to_column_type(&column, value);
return Some((column, coerced, true)); }
}
}
None
}
fn extract_in_list_values(expr: &ast::Expression, ctx: &PushdownContext<'_>) -> Option<Vec<Value>> {
match expr {
ast::Expression::ExpressionList(list) => list
.expressions
.iter()
.map(|e| extract_literal_with_ctx(e, ctx))
.collect(),
ast::Expression::List(list) => list
.elements
.iter()
.map(|e| extract_literal_with_ctx(e, ctx))
.collect(),
ast::Expression::ScalarSubquery(_) => None,
_ => extract_literal_with_ctx(expr, ctx).map(|v| vec![v]),
}
}
pub struct ComparisonRule;
impl PushdownRule for ComparisonRule {
fn name(&self) -> &'static str {
"comparison"
}
fn try_convert(&self, expr: &ast::Expression, ctx: &PushdownContext<'_>) -> PushdownResult {
let infix = match expr {
ast::Expression::Infix(i) => i,
_ => return PushdownResult::NotApplicable,
};
let base_op = match infix_to_operator(infix.op_type) {
Some(op) => op,
None => return PushdownResult::NotApplicable,
};
let (column, value, flipped) =
match extract_comparison_parts(&infix.left, &infix.right, ctx) {
Some(parts) => parts,
None => return PushdownResult::NotApplicable,
};
let operator = if flipped {
flip_operator(base_op)
} else {
base_op
};
let mut expr = ComparisonExpr::new(column, operator, value);
expr.prepare_for_schema(ctx.schema);
PushdownResult::Converted(Box::new(expr))
}
}
pub struct LogicalAndRule;
impl PushdownRule for LogicalAndRule {
fn name(&self) -> &'static str {
"logical_and"
}
fn try_convert(&self, expr: &ast::Expression, ctx: &PushdownContext<'_>) -> PushdownResult {
let infix = match expr {
ast::Expression::Infix(i) if i.op_type == InfixOperator::And => i,
_ => return PushdownResult::NotApplicable,
};
let (left_pushable, left_needs_mem) = registry().try_pushdown_with_ctx(&infix.left, ctx);
let (right_pushable, right_needs_mem) = registry().try_pushdown_with_ctx(&infix.right, ctx);
let needs_memory_filter = left_needs_mem || right_needs_mem;
match (left_pushable, right_pushable) {
(Some(mut left), Some(mut right)) => {
left.prepare_for_schema(ctx.schema);
right.prepare_for_schema(ctx.schema);
let and_expr = AndExpr::new(vec![left, right]);
if needs_memory_filter {
PushdownResult::Partial(Box::new(and_expr))
} else {
PushdownResult::Converted(Box::new(and_expr))
}
}
(Some(expr), None) | (None, Some(expr)) => {
PushdownResult::Partial(expr)
}
(None, None) => {
PushdownResult::CannotPush
}
}
}
}
pub struct LogicalOrRule;
impl PushdownRule for LogicalOrRule {
fn name(&self) -> &'static str {
"logical_or"
}
fn try_convert(&self, expr: &ast::Expression, ctx: &PushdownContext<'_>) -> PushdownResult {
let infix = match expr {
ast::Expression::Infix(i) if i.op_type == InfixOperator::Or => i,
_ => return PushdownResult::NotApplicable,
};
let left = registry().convert_expr(&infix.left, ctx);
let right = registry().convert_expr(&infix.right, ctx);
match (left, right) {
(Some(mut l), Some(mut r)) => {
l.prepare_for_schema(ctx.schema);
r.prepare_for_schema(ctx.schema);
PushdownResult::Converted(Box::new(OrExpr::new(vec![l, r])))
}
_ => PushdownResult::CannotPush,
}
}
}
pub struct LogicalNotRule;
impl PushdownRule for LogicalNotRule {
fn name(&self) -> &'static str {
"logical_not"
}
fn try_convert(&self, expr: &ast::Expression, ctx: &PushdownContext<'_>) -> PushdownResult {
let prefix = match expr {
ast::Expression::Prefix(p) if p.op_type == PrefixOperator::Not => p,
_ => return PushdownResult::NotApplicable,
};
match registry().convert_expr(&prefix.right, ctx) {
Some(mut inner) => {
inner.prepare_for_schema(ctx.schema);
PushdownResult::Converted(Box::new(NotExpr::new(inner)))
}
None => PushdownResult::CannotPush,
}
}
}
pub struct LogicalXorRule;
impl PushdownRule for LogicalXorRule {
fn name(&self) -> &'static str {
"logical_xor"
}
fn try_convert(&self, expr: &ast::Expression, ctx: &PushdownContext<'_>) -> PushdownResult {
let infix = match expr {
ast::Expression::Infix(i) if i.op_type == InfixOperator::Xor => i,
_ => return PushdownResult::NotApplicable,
};
let left1 = registry().convert_expr(&infix.left, ctx);
let right1 = registry().convert_expr(&infix.right, ctx);
let left2 = registry().convert_expr(&infix.left, ctx);
let right2 = registry().convert_expr(&infix.right, ctx);
match (left1, right1, left2, right2) {
(Some(mut l1), Some(mut r1), Some(mut l2), Some(mut r2)) => {
l1.prepare_for_schema(ctx.schema);
r1.prepare_for_schema(ctx.schema);
l2.prepare_for_schema(ctx.schema);
r2.prepare_for_schema(ctx.schema);
let left_and_not_right = AndExpr::new(vec![l1, Box::new(NotExpr::new(r1))]);
let not_left_and_right = AndExpr::new(vec![Box::new(NotExpr::new(l2)), r2]);
PushdownResult::Converted(Box::new(OrExpr::new(vec![
Box::new(left_and_not_right),
Box::new(not_left_and_right),
])))
}
_ => PushdownResult::CannotPush,
}
}
}
pub struct BetweenRule;
impl PushdownRule for BetweenRule {
fn name(&self) -> &'static str {
"between"
}
fn try_convert(&self, expr: &ast::Expression, ctx: &PushdownContext<'_>) -> PushdownResult {
let between = match expr {
ast::Expression::Between(b) => b,
_ => return PushdownResult::NotApplicable,
};
let column = match extract_column_name(&between.expr) {
Some(c) => c,
None => return PushdownResult::CannotPush,
};
if !ctx.has_column(&column) {
return PushdownResult::CannotPush;
}
let lower = match extract_literal_with_ctx(&between.lower, ctx) {
Some(v) => ctx.coerce_to_column_type(&column, v),
None => return PushdownResult::CannotPush,
};
let upper = match extract_literal_with_ctx(&between.upper, ctx) {
Some(v) => ctx.coerce_to_column_type(&column, v),
None => return PushdownResult::CannotPush,
};
let mut expr = if between.not {
BetweenExpr::not_between(column, lower, upper)
} else {
BetweenExpr::new(column, lower, upper)
};
expr.prepare_for_schema(ctx.schema);
PushdownResult::Converted(Box::new(expr))
}
}
pub struct InListRule;
impl PushdownRule for InListRule {
fn name(&self) -> &'static str {
"in_list"
}
fn try_convert(&self, expr: &ast::Expression, ctx: &PushdownContext<'_>) -> PushdownResult {
let in_expr = match expr {
ast::Expression::In(i) => i,
_ => return PushdownResult::NotApplicable,
};
let column = match extract_column_name(&in_expr.left) {
Some(c) => c,
None => return PushdownResult::CannotPush,
};
if !ctx.has_column(&column) {
return PushdownResult::CannotPush;
}
let values = match extract_in_list_values(&in_expr.right, ctx) {
Some(v) if !v.is_empty() => v,
_ => return PushdownResult::CannotPush,
};
let coerced_values: Vec<Value> = values
.into_iter()
.map(|v| ctx.coerce_to_column_type(&column, v))
.collect();
let mut expr = if in_expr.not {
InListExpr::not_in(column, coerced_values)
} else {
InListExpr::new(column, coerced_values)
};
expr.prepare_for_schema(ctx.schema);
PushdownResult::Converted(Box::new(expr))
}
}
pub struct LikeRule;
impl PushdownRule for LikeRule {
fn name(&self) -> &'static str {
"like"
}
fn try_convert(&self, expr: &ast::Expression, ctx: &PushdownContext<'_>) -> PushdownResult {
match expr {
ast::Expression::Like(like) => self.convert_like_expr(like, ctx),
ast::Expression::Infix(infix) => self.convert_infix_like(infix, ctx),
_ => PushdownResult::NotApplicable,
}
}
}
impl LikeRule {
fn convert_like_expr(
&self,
like: &ast::LikeExpression,
ctx: &PushdownContext<'_>,
) -> PushdownResult {
let op_upper = like.operator.to_uppercase();
if op_upper.contains("GLOB") || op_upper.contains("REGEXP") || op_upper.contains("RLIKE") {
return PushdownResult::CannotPush;
}
if like.escape.is_some() {
return PushdownResult::CannotPush;
}
let (column, force_ilike) = if let Some(col) = extract_column_name(&like.left) {
(col, false)
} else if let Some(col) = self.extract_upper_lower_column(&like.left) {
(col, true) } else {
return PushdownResult::CannotPush;
};
if !ctx.has_column(&column) {
return PushdownResult::CannotPush;
}
let pattern = match &*like.pattern {
ast::Expression::StringLiteral(s) => s.value.clone(),
_ => return PushdownResult::CannotPush,
};
let is_not = op_upper.contains("NOT");
let is_ilike = force_ilike || op_upper.contains("ILIKE");
let mut expr = match (is_not, is_ilike) {
(true, true) => LikeExpr::not_ilike(column, pattern),
(true, false) => LikeExpr::not_like(column, pattern),
(false, true) => LikeExpr::new_ilike(column, pattern),
(false, false) => LikeExpr::new(column, pattern),
};
expr.prepare_for_schema(ctx.schema);
PushdownResult::Converted(Box::new(expr))
}
fn extract_upper_lower_column(&self, expr: &ast::Expression) -> Option<String> {
if let ast::Expression::FunctionCall(fc) = expr {
let func_name = fc.function.to_uppercase();
if (func_name == "UPPER" || func_name == "LOWER") && fc.arguments.len() == 1 {
return extract_column_name(&fc.arguments[0]);
}
}
None
}
fn convert_infix_like(
&self,
infix: &ast::InfixExpression,
ctx: &PushdownContext<'_>,
) -> PushdownResult {
let is_like_op = matches!(
infix.op_type,
InfixOperator::Like
| InfixOperator::ILike
| InfixOperator::NotLike
| InfixOperator::NotILike
);
if !is_like_op {
return PushdownResult::NotApplicable;
}
let (column, force_ilike) = if let Some(col) = extract_column_name(&infix.left) {
(col, false)
} else if let Some(col) = self.extract_upper_lower_column(&infix.left) {
(col, true) } else {
return PushdownResult::CannotPush;
};
if !ctx.has_column(&column) {
return PushdownResult::CannotPush;
}
let pattern = match extract_pattern(&infix.right) {
Some(p) => p,
None => return PushdownResult::CannotPush,
};
let (is_ilike, is_not) = match infix.op_type {
InfixOperator::Like => (force_ilike, false),
InfixOperator::ILike => (true, false),
InfixOperator::NotLike => (force_ilike, true),
InfixOperator::NotILike => (true, true),
_ => return PushdownResult::NotApplicable, };
let mut like_expr = if is_ilike {
LikeExpr::new_ilike(column, pattern)
} else {
LikeExpr::new(column, pattern)
};
like_expr.prepare_for_schema(ctx.schema);
if is_not {
PushdownResult::Converted(Box::new(NotExpr::new(Box::new(like_expr))))
} else {
PushdownResult::Converted(Box::new(like_expr))
}
}
}
fn extract_pattern(expr: &ast::Expression) -> Option<String> {
match expr {
ast::Expression::StringLiteral(s) => Some(s.value.to_string()),
_ => None,
}
}
pub struct NullCheckRule;
impl PushdownRule for NullCheckRule {
fn name(&self) -> &'static str {
"null_check"
}
fn try_convert(&self, expr: &ast::Expression, ctx: &PushdownContext<'_>) -> PushdownResult {
let infix = match expr {
ast::Expression::Infix(i) => i,
_ => return PushdownResult::NotApplicable,
};
match infix.op_type {
InfixOperator::Is | InfixOperator::IsNot => {
if !matches!(&*infix.right, ast::Expression::NullLiteral(_)) {
return PushdownResult::NotApplicable;
}
let column = match extract_column_name(&infix.left) {
Some(c) => c,
None => return PushdownResult::CannotPush,
};
if !ctx.has_column(&column) {
return PushdownResult::CannotPush;
}
let mut expr = if infix.op_type == InfixOperator::IsNot {
NullCheckExpr::is_not_null(column)
} else {
NullCheckExpr::is_null(column)
};
expr.prepare_for_schema(ctx.schema);
PushdownResult::Converted(Box::new(expr))
}
_ => PushdownResult::NotApplicable,
}
}
}
pub struct BooleanCheckRule;
impl PushdownRule for BooleanCheckRule {
fn name(&self) -> &'static str {
"boolean_check"
}
fn try_convert(&self, expr: &ast::Expression, ctx: &PushdownContext<'_>) -> PushdownResult {
let infix = match expr {
ast::Expression::Infix(i) => i,
_ => return PushdownResult::NotApplicable,
};
match infix.op_type {
InfixOperator::Is => {
let bool_val = match &*infix.right {
ast::Expression::BooleanLiteral(b) => b.value,
_ => return PushdownResult::NotApplicable,
};
let column = match extract_column_name(&infix.left) {
Some(c) => c,
None => return PushdownResult::CannotPush,
};
if !ctx.has_column(&column) {
return PushdownResult::CannotPush;
}
let mut expr = ComparisonExpr::eq(column, Value::Boolean(bool_val));
expr.prepare_for_schema(ctx.schema);
PushdownResult::Converted(Box::new(expr))
}
InfixOperator::IsNot => {
let bool_val = match &*infix.right {
ast::Expression::BooleanLiteral(b) => b.value,
_ => return PushdownResult::NotApplicable,
};
let column = match extract_column_name(&infix.left) {
Some(c) => c,
None => return PushdownResult::CannotPush,
};
if !ctx.has_column(&column) {
return PushdownResult::CannotPush;
}
let mut ne_expr = ComparisonExpr::ne(&column, Value::Boolean(bool_val));
ne_expr.prepare_for_schema(ctx.schema);
let mut null_expr = NullCheckExpr::is_null(&column);
null_expr.prepare_for_schema(ctx.schema);
PushdownResult::Converted(Box::new(OrExpr::new(vec![
Box::new(ne_expr),
Box::new(null_expr),
])))
}
_ => PushdownResult::NotApplicable,
}
}
}
pub struct BooleanLiteralRule;
impl PushdownRule for BooleanLiteralRule {
fn name(&self) -> &'static str {
"boolean_literal"
}
fn try_convert(&self, expr: &ast::Expression, _ctx: &PushdownContext<'_>) -> PushdownResult {
match expr {
ast::Expression::BooleanLiteral(b) => {
PushdownResult::Converted(Box::new(ConstBoolExpr::new(b.value)))
}
_ => PushdownResult::NotApplicable,
}
}
}
pub struct FunctionRule;
impl PushdownRule for FunctionRule {
fn name(&self) -> &'static str {
"function"
}
fn try_convert(&self, expr: &ast::Expression, ctx: &PushdownContext<'_>) -> PushdownResult {
let infix = match expr {
ast::Expression::Infix(i) => i,
_ => return PushdownResult::NotApplicable,
};
let base_op = match infix_to_operator(infix.op_type) {
Some(op) => op,
None => return PushdownResult::NotApplicable,
};
if let Some((func_expr, flipped)) =
self.try_extract_function_comparison(&infix.left, &infix.right, base_op, ctx)
{
let _ = flipped; return PushdownResult::Converted(func_expr);
}
if let Some((func_expr, _)) = self.try_extract_function_comparison(
&infix.right,
&infix.left,
flip_operator(base_op),
ctx,
) {
return PushdownResult::Converted(func_expr);
}
PushdownResult::NotApplicable
}
}
impl FunctionRule {
fn try_extract_function_comparison(
&self,
func_side: &ast::Expression,
value_side: &ast::Expression,
operator: crate::core::Operator,
ctx: &PushdownContext<'_>,
) -> Option<(Box<dyn StorageExpr>, bool)> {
let func_call = match func_side {
ast::Expression::FunctionCall(fc) => fc,
_ => return None,
};
let func_name = func_call.function.to_uppercase();
let scalar_func = global_registry().get_scalar(&func_name)?;
let mut args = Vec::with_capacity(func_call.arguments.len());
for arg in &func_call.arguments {
match self.convert_func_arg(arg, ctx) {
Some(fa) => args.push(fa),
None => return None, }
}
let compare_value = extract_literal_with_ctx(value_side, ctx)?;
let mut func_expr =
FunctionExpr::new(Arc::from(scalar_func), args, operator, compare_value);
func_expr.prepare_for_schema(ctx.schema);
Some((Box::new(func_expr), false))
}
fn convert_func_arg(
&self,
expr: &ast::Expression,
ctx: &PushdownContext<'_>,
) -> Option<FunctionArg> {
if let Some(col_name) = extract_column_name(expr) {
return Some(FunctionArg::Column(col_name));
}
if let Some(value) = extract_literal_with_ctx(expr, ctx) {
return Some(FunctionArg::Literal(value));
}
if let ast::Expression::FunctionCall(fc) = expr {
let func_name = fc.function.to_uppercase();
if let Some(scalar_func) = global_registry().get_scalar(&func_name) {
let mut nested_args = Vec::with_capacity(fc.arguments.len());
for arg in &fc.arguments {
nested_args.push(self.convert_func_arg(arg, ctx)?);
}
return Some(FunctionArg::Function {
function: Arc::from(scalar_func),
arguments: nested_args,
});
}
}
None
}
}