hamelin_datafusion 0.6.13

Translate Hamelin TypedAST to DataFusion LogicalPlans
Documentation
//! DataFusion translations for datetime functions.

use datafusion::common::ScalarValue;
use datafusion::logical_expr::{BinaryExpr, Expr as DFExpr, Operator as DFOperator};
use datafusion_functions::datetime::expr_fn as datetime_fn;
use hamelin_lib::func::defs::{
    AtTimezone, Day, DayOfWeek, FromMillis, FromNanos, FromUnixtimeMicros, FromUnixtimeMillis,
    FromUnixtimeNanos, FromUnixtimeSeconds, Hour, Minute, Month, Now, Second, ToMillis, ToNanos,
    ToUnixtime, Today, Tomorrow, Ts, Year, Yesterday,
};

use super::DataFusionTranslationRegistry;

/// Cast a timestamp expression to Timestamp(Microsecond, UTC).
/// This ensures all Hamelin timestamps are normalized to microsecond precision with UTC timezone.
fn cast_to_timestamp_micros_utc(expr: DFExpr) -> DFExpr {
    use datafusion::arrow::datatypes::{DataType, TimeUnit};
    use datafusion::logical_expr::expr::Cast;

    let target_type = DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into()));
    DFExpr::Cast(Cast::new(Box::new(expr), target_type))
}

pub fn register(registry: &mut DataFusionTranslationRegistry) {
    // now() -> cast(now() as Timestamp(Microsecond, UTC))
    registry.register::<Now>(|_params| Ok(cast_to_timestamp_micros_utc(datetime_fn::now())));

    // today() -> cast(date_trunc('day', now()) as Timestamp(Microsecond, UTC))
    registry.register::<Today>(|_params| {
        let ts = datetime_fn::date_trunc(datafusion::logical_expr::lit("day"), datetime_fn::now());
        Ok(cast_to_timestamp_micros_utc(ts))
    });

    // yesterday() -> cast(date_trunc('day', now() - interval '1 day') as Timestamp(Microsecond, UTC))
    registry.register::<Yesterday>(|_params| {
        let now = datetime_fn::now();
        let one_day = datafusion::logical_expr::lit(ScalarValue::new_interval_dt(1, 0));
        let yesterday = DFExpr::BinaryExpr(BinaryExpr::new(
            Box::new(now),
            DFOperator::Minus,
            Box::new(one_day),
        ));
        let ts = datetime_fn::date_trunc(datafusion::logical_expr::lit("day"), yesterday);
        Ok(cast_to_timestamp_micros_utc(ts))
    });

    // tomorrow() -> cast(date_trunc('day', now() + interval '1 day') as Timestamp(Microsecond, UTC))
    registry.register::<Tomorrow>(|_params| {
        let now = datetime_fn::now();
        let one_day = datafusion::logical_expr::lit(ScalarValue::new_interval_dt(1, 0));
        let tomorrow = DFExpr::BinaryExpr(BinaryExpr::new(
            Box::new(now),
            DFOperator::Plus,
            Box::new(one_day),
        ));
        let ts = datetime_fn::date_trunc(datafusion::logical_expr::lit("day"), tomorrow);
        Ok(cast_to_timestamp_micros_utc(ts))
    });

    // ts(string) -> hamelin_parse_timestamp(string)
    // Uses custom UDF that parses with chrono to support dates outside the
    // nanosecond range (1677-2262) that DataFusion's to_timestamp() is limited to.
    registry.register::<Ts>(|mut params| {
        let timestamp_str = params.take()?.expr;
        Ok(crate::udf::parse_timestamp_udf().call(vec![timestamp_str]))
    });

    // year(timestamp) -> date_part('year', timestamp)
    registry.register::<Year>(|mut params| {
        let timestamp = params.take()?.expr;
        Ok(datetime_fn::date_part(
            datafusion::logical_expr::lit("year"),
            timestamp,
        ))
    });

    // month(timestamp) -> date_part('month', timestamp)
    registry.register::<Month>(|mut params| {
        let timestamp = params.take()?.expr;
        Ok(datetime_fn::date_part(
            datafusion::logical_expr::lit("month"),
            timestamp,
        ))
    });

    // day(timestamp) -> date_part('day', timestamp)
    registry.register::<Day>(|mut params| {
        let timestamp = params.take()?.expr;
        Ok(datetime_fn::date_part(
            datafusion::logical_expr::lit("day"),
            timestamp,
        ))
    });

    // day_of_week(timestamp) -> ((date_part('dow', timestamp) + 6) % 7) + 1
    // DataFusion's dow returns 0-6 (Sunday=0, Saturday=6)
    // Hamelin/Trino uses ISO weekday: 1-7 (Monday=1, Sunday=7)
    // Conversion: (dow + 6) % 7 + 1
    //   dow=0 (Sun) -> (0+6)%7+1 = 6%7+1 = 6+1 = 7
    //   dow=1 (Mon) -> (1+6)%7+1 = 7%7+1 = 0+1 = 1
    //   dow=6 (Sat) -> (6+6)%7+1 = 12%7+1 = 5+1 = 6
    registry.register::<DayOfWeek>(|mut params| {
        let timestamp = params.take()?.expr;
        let dow = datetime_fn::date_part(datafusion::logical_expr::lit("dow"), timestamp);
        // (dow + 6) % 7 + 1
        let plus_six = DFExpr::BinaryExpr(BinaryExpr::new(
            Box::new(dow),
            DFOperator::Plus,
            Box::new(datafusion::logical_expr::lit(6i64)),
        ));
        let mod_seven = DFExpr::BinaryExpr(BinaryExpr::new(
            Box::new(plus_six),
            DFOperator::Modulo,
            Box::new(datafusion::logical_expr::lit(7i64)),
        ));
        Ok(DFExpr::BinaryExpr(BinaryExpr::new(
            Box::new(mod_seven),
            DFOperator::Plus,
            Box::new(datafusion::logical_expr::lit(1i64)),
        )))
    });

    // hour(timestamp) -> date_part('hour', timestamp)
    registry.register::<Hour>(|mut params| {
        let timestamp = params.take()?.expr;
        Ok(datetime_fn::date_part(
            datafusion::logical_expr::lit("hour"),
            timestamp,
        ))
    });

    // minute(timestamp) -> date_part('minute', timestamp)
    registry.register::<Minute>(|mut params| {
        let timestamp = params.take()?.expr;
        Ok(datetime_fn::date_part(
            datafusion::logical_expr::lit("minute"),
            timestamp,
        ))
    });

    // second(timestamp) -> date_part('second', timestamp)
    registry.register::<Second>(|mut params| {
        let timestamp = params.take()?.expr;
        Ok(datetime_fn::date_part(
            datafusion::logical_expr::lit("second"),
            timestamp,
        ))
    });

    // at_timezone(timestamp, timezone) -> CAST(timestamp AS Timestamp(Microsecond, tz))
    // This converts the timestamp to the specified timezone while preserving the instant.
    registry.register::<AtTimezone>(|mut params| {
        use datafusion::arrow::datatypes::{DataType, TimeUnit};
        use datafusion::logical_expr::expr::Cast;

        let timestamp = params.take()?.expr;
        let timezone_expr = params.take()?.expr;

        // Extract the timezone string from the literal expression
        let timezone_str = match &timezone_expr {
            DFExpr::Literal(ScalarValue::Utf8(Some(s)), _) => s.clone(),
            DFExpr::Literal(ScalarValue::LargeUtf8(Some(s)), _) => s.clone(),
            _ => {
                return Err(anyhow::anyhow!(
                    "at_timezone requires a string literal for timezone, got {:?}",
                    timezone_expr
                ))
            }
        };

        // Build the target DataType directly
        let target_type = DataType::Timestamp(TimeUnit::Microsecond, Some(timezone_str.into()));
        Ok(DFExpr::Cast(Cast::new(Box::new(timestamp), target_type)))
    });

    // to_millis(interval) -> hamelin_to_millis(interval)
    registry.register::<ToMillis>(|mut params| {
        let interval = params.take()?.expr;
        Ok(crate::udf::to_millis_udf().call(vec![interval]))
    });

    // to_nanos(interval) -> hamelin_to_nanos(interval)
    registry.register::<ToNanos>(|mut params| {
        let interval = params.take()?.expr;
        Ok(crate::udf::to_nanos_udf().call(vec![interval]))
    });

    // from_millis(millis) -> hamelin_from_millis(millis)
    registry.register::<FromMillis>(|mut params| {
        let millis = params.take()?.expr;
        Ok(crate::udf::from_millis_udf().call(vec![millis]))
    });

    // from_nanos(nanos) -> hamelin_from_nanos(nanos)
    registry.register::<FromNanos>(|mut params| {
        let nanos = params.take()?.expr;
        Ok(crate::udf::from_nanos_udf().call(vec![nanos]))
    });

    // from_unixtime_seconds(seconds) -> cast(to_timestamp_seconds(seconds) as Timestamp(Microsecond, UTC))
    registry.register::<FromUnixtimeSeconds>(|mut params| {
        let seconds = params.take()?.expr;
        let ts = datetime_fn::to_timestamp_seconds(vec![seconds]);
        Ok(cast_to_timestamp_micros_utc(ts))
    });

    // from_unixtime_millis(millis) -> cast(to_timestamp_millis(millis) as Timestamp(Microsecond, UTC))
    registry.register::<FromUnixtimeMillis>(|mut params| {
        let millis = params.take()?.expr;
        let ts = datetime_fn::to_timestamp_millis(vec![millis]);
        Ok(cast_to_timestamp_micros_utc(ts))
    });

    // from_unixtime_micros(micros) -> cast(to_timestamp_micros(micros) as Timestamp(Microsecond, UTC))
    registry.register::<FromUnixtimeMicros>(|mut params| {
        let micros = params.take()?.expr;
        let ts = datetime_fn::to_timestamp_micros(vec![micros]);
        Ok(cast_to_timestamp_micros_utc(ts))
    });

    // from_unixtime_nanos(nanos) -> cast(to_timestamp_nanos(nanos) as Timestamp(Microsecond, UTC))
    // Note: This loses nanosecond precision, but Hamelin standardizes on microseconds
    registry.register::<FromUnixtimeNanos>(|mut params| {
        let nanos = params.take()?.expr;
        let ts = datetime_fn::to_timestamp_nanos(vec![nanos]);
        Ok(cast_to_timestamp_micros_utc(ts))
    });

    // to_unixtime(timestamp) -> extract(epoch from timestamp)
    registry.register::<ToUnixtime>(|mut params| {
        let timestamp = params.take()?.expr;
        Ok(datetime_fn::date_part(
            datafusion::logical_expr::lit("epoch"),
            timestamp,
        ))
    });
}