use std::sync::Arc;
use datafusion::{
logical_expr::Operator,
physical_plan::{
PhysicalExpr,
expressions::{BinaryExpr, DynamicFilterPhysicalExpr, LikeExpr, Literal},
},
scalar::ScalarValue,
};
use crate::liquid_array::get_bytes_needle;
#[derive(Debug)]
pub enum Comparison {
Lt,
Gt,
LtEq,
GtEq,
}
#[derive(Debug)]
pub enum Equality {
Eq,
NotEq,
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum SubString {
Contains,
NotContains,
}
#[derive(Debug)]
pub enum ByteViewOperator {
Comparison(Comparison),
Equality(Equality),
SubString(SubString),
}
impl ByteViewOperator {
fn from_like_expr(like: &LikeExpr) -> Result<Self, UnsupportedOperator> {
match (like.negated(), like.case_insensitive()) {
(false, false) => Ok(ByteViewOperator::SubString(SubString::Contains)),
(true, false) => Ok(ByteViewOperator::SubString(SubString::NotContains)),
_ => Err(UnsupportedOperator),
}
}
}
pub struct UnsupportedOperator;
impl TryFrom<&Operator> for ByteViewOperator {
type Error = UnsupportedOperator;
fn try_from(operator: &Operator) -> Result<Self, UnsupportedOperator> {
match operator {
Operator::Eq => Ok(ByteViewOperator::Equality(Equality::Eq)),
Operator::NotEq => Ok(ByteViewOperator::Equality(Equality::NotEq)),
Operator::Lt => Ok(ByteViewOperator::Comparison(Comparison::Lt)),
Operator::Gt => Ok(ByteViewOperator::Comparison(Comparison::Gt)),
Operator::LtEq => Ok(ByteViewOperator::Comparison(Comparison::LtEq)),
Operator::GtEq => Ok(ByteViewOperator::Comparison(Comparison::GtEq)),
Operator::LikeMatch => Ok(ByteViewOperator::SubString(SubString::Contains)),
Operator::NotLikeMatch => Ok(ByteViewOperator::SubString(SubString::NotContains)),
_ => Err(UnsupportedOperator),
}
}
}
impl From<&ByteViewOperator> for Operator {
fn from(byte_view_operator: &ByteViewOperator) -> Self {
match byte_view_operator {
ByteViewOperator::Comparison(comparison) => match comparison {
Comparison::Lt => Operator::Lt,
Comparison::Gt => Operator::Gt,
Comparison::LtEq => Operator::LtEq,
Comparison::GtEq => Operator::GtEq,
},
ByteViewOperator::Equality(equality) => match equality {
Equality::Eq => Operator::Eq,
Equality::NotEq => Operator::NotEq,
},
ByteViewOperator::SubString(substring) => match substring {
SubString::Contains => Operator::LikeMatch,
SubString::NotContains => Operator::NotLikeMatch,
},
}
}
}
#[derive(Debug)]
pub(super) struct ByteViewExpression {
op: ByteViewOperator,
literal: Vec<u8>,
}
pub(super) enum UnsupportedExpression {
Op,
Expr,
Constant(bool),
}
impl From<UnsupportedOperator> for UnsupportedExpression {
fn from(_op: UnsupportedOperator) -> Self {
UnsupportedExpression::Op
}
}
impl ByteViewExpression {
pub(super) fn op(&self) -> &ByteViewOperator {
&self.op
}
pub(super) fn literal(&self) -> &[u8] {
&self.literal
}
}
impl TryFrom<&Arc<dyn PhysicalExpr>> for ByteViewExpression {
type Error = UnsupportedExpression;
fn try_from(expr: &Arc<dyn PhysicalExpr>) -> Result<Self, UnsupportedExpression> {
let expr = if let Some(dynamic_filter) =
expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>()
{
dynamic_filter.current().unwrap()
} else {
expr.clone()
};
if let Some(literal) = expr.as_any().downcast_ref::<Literal>()
&& let ScalarValue::Boolean(Some(v)) = literal.value()
{
return Err(UnsupportedExpression::Constant(*v));
}
if let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>() {
if let Some(literal) = binary_expr.right().as_any().downcast_ref::<Literal>() {
let op = binary_expr.op();
let byte_view_operator = ByteViewOperator::try_from(op)?;
let literal =
get_bytes_needle(literal.value()).ok_or(UnsupportedExpression::Expr)?;
return Ok(ByteViewExpression {
op: byte_view_operator,
literal,
});
}
}
else if let Some(like_expr) = expr.as_any().downcast_ref::<LikeExpr>()
&& let Some(literal) = like_expr.pattern().as_any().downcast_ref::<Literal>()
{
let byte_view_operator = ByteViewOperator::from_like_expr(like_expr)?;
let literal = get_bytes_needle(literal.value()).ok_or(UnsupportedExpression::Expr)?;
return Ok(ByteViewExpression {
op: byte_view_operator,
literal,
});
}
Err(UnsupportedExpression::Expr)
}
}