use datafusion::logical_expr::expr::InList;
use datafusion::logical_expr::{BinaryExpr, Expr as DFExpr, Operator as DFOperator};
use datafusion_functions::core::expr_ext::FieldAccessor;
use datafusion_functions::core::expr_fn as core_fn;
use datafusion_functions_nested::expr_fn as array_fn;
use hamelin_lib::func::defs::{
InArray, InMap, InRange, InTimestampInterval, InTimestampTimestamp, InTuple, NotInArray,
NotInMap, NotInRange, NotInTimestampInterval, NotInTimestampTimestamp, NotInTuple,
NumericRange, TimestampRange,
};
use super::DataFusionTranslationRegistry;
fn try_extract_tuple_elements(expr: DFExpr) -> Option<Vec<DFExpr>> {
match expr {
DFExpr::ScalarFunction(scalar_func) => {
if scalar_func.func.name() == "struct" {
Some(scalar_func.args)
} else {
None
}
}
_ => None,
}
}
pub fn register(registry: &mut DataFusionTranslationRegistry) {
registry.register::<InArray>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
Ok(array_fn::array_has(right, left))
});
registry.register::<NotInArray>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
Ok(DFExpr::Not(Box::new(array_fn::array_has(right, left))))
});
registry.register::<InMap>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
Ok(array_fn::array_has(array_fn::map_keys(right), left))
});
registry.register::<NotInMap>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
Ok(DFExpr::Not(Box::new(array_fn::array_has(
array_fn::map_keys(right),
left,
))))
});
registry.register::<NumericRange>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
Ok(core_fn::named_struct(vec![
datafusion::logical_expr::lit("begin"),
left,
datafusion::logical_expr::lit("end"),
right,
]))
});
registry.register::<TimestampRange>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
Ok(core_fn::named_struct(vec![
datafusion::logical_expr::lit("begin"),
left,
datafusion::logical_expr::lit("end"),
right,
]))
});
registry.register::<InRange>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
let begin = right.clone().field("begin");
let end = right.field("end");
let begin_is_null = DFExpr::IsNull(Box::new(begin.clone()));
let gte_begin = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(left.clone()),
DFOperator::GtEq,
Box::new(begin),
));
let begin_check = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(begin_is_null),
DFOperator::Or,
Box::new(gte_begin),
));
let end_is_null = DFExpr::IsNull(Box::new(end.clone()));
let lt_end = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(left),
DFOperator::Lt,
Box::new(end),
));
let end_check = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(end_is_null),
DFOperator::Or,
Box::new(lt_end),
));
Ok(DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(begin_check),
DFOperator::And,
Box::new(end_check),
)))
});
registry.register::<NotInRange>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
let begin = right.clone().field("begin");
let end = right.field("end");
let begin_is_null = DFExpr::IsNull(Box::new(begin.clone()));
let gte_begin = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(left.clone()),
DFOperator::GtEq,
Box::new(begin),
));
let begin_check = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(begin_is_null),
DFOperator::Or,
Box::new(gte_begin),
));
let end_is_null = DFExpr::IsNull(Box::new(end.clone()));
let lt_end = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(left),
DFOperator::Lt,
Box::new(end),
));
let end_check = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(end_is_null),
DFOperator::Or,
Box::new(lt_end),
));
let in_range = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(begin_check),
DFOperator::And,
Box::new(end_check),
));
Ok(DFExpr::Not(Box::new(in_range)))
});
registry.register::<InTimestampInterval>(|mut params| {
let timestamp = params.take()?.expr;
let interval = params.take()?.expr;
let now = datafusion_functions::datetime::expr_fn::now();
let begin = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(now.clone()),
DFOperator::Plus,
Box::new(interval),
));
let gte_begin = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(timestamp.clone()),
DFOperator::GtEq,
Box::new(begin),
));
let lte_now = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(timestamp),
DFOperator::LtEq,
Box::new(now),
));
Ok(DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(gte_begin),
DFOperator::And,
Box::new(lte_now),
)))
});
registry.register::<NotInTimestampInterval>(|mut params| {
let timestamp = params.take()?.expr;
let interval = params.take()?.expr;
let now = datafusion_functions::datetime::expr_fn::now();
let begin = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(now.clone()),
DFOperator::Plus,
Box::new(interval),
));
let gte_begin = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(timestamp.clone()),
DFOperator::GtEq,
Box::new(begin),
));
let lte_now = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(timestamp),
DFOperator::LtEq,
Box::new(now),
));
let in_interval = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(gte_begin),
DFOperator::And,
Box::new(lte_now),
));
Ok(DFExpr::Not(Box::new(in_interval)))
});
registry.register::<InTimestampTimestamp>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
let now = datafusion_functions::datetime::expr_fn::now();
let gte_right = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(left.clone()),
DFOperator::GtEq,
Box::new(right),
));
let lte_now = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(left),
DFOperator::LtEq,
Box::new(now),
));
Ok(DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(gte_right),
DFOperator::And,
Box::new(lte_now),
)))
});
registry.register::<NotInTimestampTimestamp>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
let now = datafusion_functions::datetime::expr_fn::now();
let gte_right = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(left.clone()),
DFOperator::GtEq,
Box::new(right),
));
let lte_now = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(left),
DFOperator::LtEq,
Box::new(now),
));
let in_range = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(gte_right),
DFOperator::And,
Box::new(lte_now),
));
Ok(DFExpr::Not(Box::new(in_range)))
});
registry.register::<InTuple>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
let list = try_extract_tuple_elements(right).ok_or_else(|| {
anyhow::anyhow!(
"IN tuple: expected struct() expression for tuple, got a different expression type"
)
})?;
Ok(DFExpr::InList(InList::new(Box::new(left), list, false)))
});
registry.register::<NotInTuple>(|mut params| {
let left = params.take()?.expr;
let right = params.take()?.expr;
let list = try_extract_tuple_elements(right).ok_or_else(|| {
anyhow::anyhow!(
"NOT IN tuple: expected struct() expression for tuple, got a different expression type"
)
})?;
Ok(DFExpr::InList(InList::new(Box::new(left), list, true)))
});
}