hamelin_datafusion 0.7.4

Translate Hamelin TypedAST to DataFusion LogicalPlans
Documentation
//! DataFusion function translation registry.
//!
//! This module contains translation functions that convert Hamelin function calls
//! (after argument translation) into DataFusion expressions.

mod aggregate;
mod arithmetic;
mod array;
mod comparison;
mod conditional;
mod datetime;
mod interval;
mod json;
mod logical;
mod map;
mod math;
mod membership;
mod operators;
mod regex;
mod string;
mod window;

use std::any::TypeId;
use std::collections::HashMap;
use std::sync::Arc;

use datafusion::logical_expr::Expr as DFExpr;

use hamelin_lib::func::def::{FunctionDef, HasType, ParameterBinding};
use hamelin_lib::types::Type;

/// Error type for function translation failures.
#[derive(Debug, thiserror::Error)]
pub enum FunctionTranslationFailure {
    #[error("No DataFusion translation registered for function '{0}'")]
    NoTranslation(String),

    #[error("Translation error for function '{function}': {message}")]
    TranslationError { function: String, message: String },
}

/// A translated DataFusion expression paired with its Hamelin type.
///
/// This enables type-aware function translations where the translation
/// logic needs access to the original Hamelin types of the arguments.
#[derive(Clone)]
pub struct DFTranslation {
    /// The DataFusion expression
    pub expr: DFExpr,
    /// The Hamelin type of this expression (Arc for efficient cloning)
    pub typ: Arc<Type>,
}

impl DFTranslation {
    /// Create a new DFTranslation from an expression and type.
    pub fn new(expr: DFExpr, typ: Arc<Type>) -> Self {
        Self { expr, typ }
    }
}

impl HasType for DFTranslation {
    fn typ(&self) -> &Type {
        &self.typ
    }
}

/// Type alias for DataFusion translation functions.
///
/// A translation function takes a parameter binding of already-translated DataFusion
/// expressions (with their Hamelin types) and produces a single DataFusion expression
/// representing the function call.
pub type DFTranslationFn =
    Box<dyn Fn(ParameterBinding<DFTranslation>) -> anyhow::Result<DFExpr> + Send + Sync>;

/// Registry of DataFusion translation functions.
///
/// This registry maps function TypeIds to translation closures. When translating
/// a Hamelin function call to DataFusion, we:
/// 1. Translate all arguments to DFExpr
/// 2. Look up the translation function by the FunctionDef's TypeId
/// 3. Apply the translation function to produce the final DFExpr
pub struct DataFusionTranslationRegistry {
    impls: HashMap<TypeId, DFTranslationFn>,
}

impl std::fmt::Debug for DataFusionTranslationRegistry {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("DataFusionTranslationRegistry")
            .field("registered_count", &self.impls.len())
            .finish()
    }
}

impl DataFusionTranslationRegistry {
    /// Create a new empty registry.
    pub fn new() -> Self {
        Self {
            impls: HashMap::new(),
        }
    }

    /// Register a translation function for a specific function definition type.
    pub fn register<F: FunctionDef>(
        &mut self,
        f: impl Fn(ParameterBinding<DFTranslation>) -> anyhow::Result<DFExpr> + Send + Sync + 'static,
    ) {
        self.impls.insert(TypeId::of::<F>(), Box::new(f));
    }

    /// Translate a function call to a DataFusion expression.
    ///
    /// Returns an error if no translation is registered for this function.
    pub fn translate(
        &self,
        func: &dyn FunctionDef,
        binding: ParameterBinding<DFTranslation>,
    ) -> Result<DFExpr, FunctionTranslationFailure> {
        let type_id = func.type_id();
        match self.impls.get(&type_id) {
            Some(translate_fn) => {
                translate_fn(binding).map_err(|e| FunctionTranslationFailure::TranslationError {
                    function: func.name().to_string(),
                    message: e.to_string(),
                })
            }
            None => Err(FunctionTranslationFailure::NoTranslation(
                func.name().to_string(),
            )),
        }
    }
}

impl Default for DataFusionTranslationRegistry {
    fn default() -> Self {
        let mut registry = Self::new();

        // Register all translations
        aggregate::register(&mut registry);
        arithmetic::register(&mut registry);
        array::register(&mut registry);
        comparison::register(&mut registry);
        conditional::register(&mut registry);
        datetime::register(&mut registry);
        interval::register(&mut registry);
        json::register(&mut registry);
        logical::register(&mut registry);
        map::register(&mut registry);
        math::register(&mut registry);
        membership::register(&mut registry);
        operators::register(&mut registry);
        regex::register(&mut registry);
        string::register(&mut registry);
        window::register(&mut registry);

        registry
    }
}