use datafusion::common::ScalarValue;
use datafusion::logical_expr::{BinaryExpr, Expr as DFExpr, Operator as DFOperator};
use datafusion_functions::datetime::expr_fn as datetime_fn;
use hamelin_lib::func::defs::{
AtTimezone, Day, DayOfWeek, FromMillis, FromNanos, FromUnixtimeMicros, FromUnixtimeMillis,
FromUnixtimeNanos, FromUnixtimeSeconds, Hour, Minute, Month, Now, Second, ToMillis, ToNanos,
ToUnixtime, Today, Tomorrow, Ts, Year, Yesterday,
};
use super::DataFusionTranslationRegistry;
fn cast_to_timestamp_micros_utc(expr: DFExpr) -> DFExpr {
use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::logical_expr::expr::Cast;
let target_type = DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into()));
DFExpr::Cast(Cast::new(Box::new(expr), target_type))
}
pub fn register(registry: &mut DataFusionTranslationRegistry) {
registry.register::<Now>(|_params| Ok(cast_to_timestamp_micros_utc(datetime_fn::now())));
registry.register::<Today>(|_params| {
let ts = datetime_fn::date_trunc(datafusion::logical_expr::lit("day"), datetime_fn::now());
Ok(cast_to_timestamp_micros_utc(ts))
});
registry.register::<Yesterday>(|_params| {
let now = datetime_fn::now();
let one_day = datafusion::logical_expr::lit(ScalarValue::new_interval_dt(1, 0));
let yesterday = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(now),
DFOperator::Minus,
Box::new(one_day),
));
let ts = datetime_fn::date_trunc(datafusion::logical_expr::lit("day"), yesterday);
Ok(cast_to_timestamp_micros_utc(ts))
});
registry.register::<Tomorrow>(|_params| {
let now = datetime_fn::now();
let one_day = datafusion::logical_expr::lit(ScalarValue::new_interval_dt(1, 0));
let tomorrow = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(now),
DFOperator::Plus,
Box::new(one_day),
));
let ts = datetime_fn::date_trunc(datafusion::logical_expr::lit("day"), tomorrow);
Ok(cast_to_timestamp_micros_utc(ts))
});
registry.register::<Ts>(|mut params| {
let timestamp_str = params.take()?.expr;
Ok(crate::udf::parse_timestamp_udf().call(vec![timestamp_str]))
});
registry.register::<Year>(|mut params| {
let timestamp = params.take()?.expr;
Ok(datetime_fn::date_part(
datafusion::logical_expr::lit("year"),
timestamp,
))
});
registry.register::<Month>(|mut params| {
let timestamp = params.take()?.expr;
Ok(datetime_fn::date_part(
datafusion::logical_expr::lit("month"),
timestamp,
))
});
registry.register::<Day>(|mut params| {
let timestamp = params.take()?.expr;
Ok(datetime_fn::date_part(
datafusion::logical_expr::lit("day"),
timestamp,
))
});
registry.register::<DayOfWeek>(|mut params| {
let timestamp = params.take()?.expr;
let dow = datetime_fn::date_part(datafusion::logical_expr::lit("dow"), timestamp);
let plus_six = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(dow),
DFOperator::Plus,
Box::new(datafusion::logical_expr::lit(6i64)),
));
let mod_seven = DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(plus_six),
DFOperator::Modulo,
Box::new(datafusion::logical_expr::lit(7i64)),
));
Ok(DFExpr::BinaryExpr(BinaryExpr::new(
Box::new(mod_seven),
DFOperator::Plus,
Box::new(datafusion::logical_expr::lit(1i64)),
)))
});
registry.register::<Hour>(|mut params| {
let timestamp = params.take()?.expr;
Ok(datetime_fn::date_part(
datafusion::logical_expr::lit("hour"),
timestamp,
))
});
registry.register::<Minute>(|mut params| {
let timestamp = params.take()?.expr;
Ok(datetime_fn::date_part(
datafusion::logical_expr::lit("minute"),
timestamp,
))
});
registry.register::<Second>(|mut params| {
let timestamp = params.take()?.expr;
Ok(datetime_fn::date_part(
datafusion::logical_expr::lit("second"),
timestamp,
))
});
registry.register::<AtTimezone>(|mut params| {
use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::logical_expr::expr::Cast;
let timestamp = params.take()?.expr;
let timezone_expr = params.take()?.expr;
let timezone_str = match &timezone_expr {
DFExpr::Literal(ScalarValue::Utf8(Some(s)), _) => s.clone(),
DFExpr::Literal(ScalarValue::LargeUtf8(Some(s)), _) => s.clone(),
_ => {
return Err(anyhow::anyhow!(
"at_timezone requires a string literal for timezone, got {:?}",
timezone_expr
))
}
};
let target_type = DataType::Timestamp(TimeUnit::Microsecond, Some(timezone_str.into()));
Ok(DFExpr::Cast(Cast::new(Box::new(timestamp), target_type)))
});
registry.register::<ToMillis>(|mut params| {
let interval = params.take()?.expr;
Ok(crate::udf::to_millis_udf().call(vec![interval]))
});
registry.register::<ToNanos>(|mut params| {
let interval = params.take()?.expr;
Ok(crate::udf::to_nanos_udf().call(vec![interval]))
});
registry.register::<FromMillis>(|mut params| {
let millis = params.take()?.expr;
Ok(crate::udf::from_millis_udf().call(vec![millis]))
});
registry.register::<FromNanos>(|mut params| {
let nanos = params.take()?.expr;
Ok(crate::udf::from_nanos_udf().call(vec![nanos]))
});
registry.register::<FromUnixtimeSeconds>(|mut params| {
let seconds = params.take()?.expr;
let ts = datetime_fn::to_timestamp_seconds(vec![seconds]);
Ok(cast_to_timestamp_micros_utc(ts))
});
registry.register::<FromUnixtimeMillis>(|mut params| {
let millis = params.take()?.expr;
let ts = datetime_fn::to_timestamp_millis(vec![millis]);
Ok(cast_to_timestamp_micros_utc(ts))
});
registry.register::<FromUnixtimeMicros>(|mut params| {
let micros = params.take()?.expr;
let ts = datetime_fn::to_timestamp_micros(vec![micros]);
Ok(cast_to_timestamp_micros_utc(ts))
});
registry.register::<FromUnixtimeNanos>(|mut params| {
let nanos = params.take()?.expr;
let ts = datetime_fn::to_timestamp_nanos(vec![nanos]);
Ok(cast_to_timestamp_micros_utc(ts))
});
registry.register::<ToUnixtime>(|mut params| {
let timestamp = params.take()?.expr;
Ok(datetime_fn::date_part(
datafusion::logical_expr::lit("epoch"),
timestamp,
))
});
}