hamelin_datafusion 0.7.8

Translate Hamelin TypedAST to DataFusion LogicalPlans
Documentation
//! Custom DataFusion UDFs for Hamelin language functions.
//!
//! These UDFs implement Hamelin functions that don't have direct equivalents in DataFusion.
//! They are used by translation functions in the function registry.

use datafusion::arrow::array::{Array, StructArray};
use datafusion::arrow::datatypes::{DataType, Field, Fields};

mod any_value;
mod array_cast;
mod array_helpers;
mod array_length;
mod array_variant_get;
pub(crate) mod from_variant;
mod interval_helper;
mod ip_address;
mod json_to_variant;
mod map_agg;
mod map_from_entries;
mod multimap_agg;
mod parse_timestamp;
mod regexp_extract_all;
mod regexp_split;
mod sliding_array_agg;
mod string_utils;
pub(crate) mod to_variant;
mod uuid5;
mod variant_get;
mod variant_to_json;
mod width_bucket;

/// The canonical Variant struct schema.
///
/// The `value` field is declared nullable because `parquet_variant_compute`'s
/// `VariantArray::from_parts` (used by the `variant_get` kernel) marks it so,
/// while `VariantArrayBuilder::build` marks it non-nullable. We normalize to
/// nullable so DataFusion's return-type assertion is satisfied regardless of
/// which code path produced the result.
pub fn variant_fields() -> Fields {
    Fields::from(vec![
        Field::new("metadata", DataType::BinaryView, false),
        Field::new("value", DataType::BinaryView, true),
    ])
}

/// The canonical Variant struct `DataType`.
pub fn variant_data_type() -> DataType {
    DataType::Struct(variant_fields())
}

/// Check whether an Arrow `DataType` represents a Variant struct.
///
/// Matches both unshredded variants (`{metadata: BinaryView, value: BinaryView}`)
/// and shredded variants (`{metadata: BinaryView, value: BinaryView, typed_value: ...}`).
pub fn is_variant_data_type(dt: &DataType) -> bool {
    if let DataType::Struct(fields) = dt {
        let has_metadata = fields
            .iter()
            .any(|f| f.name() == "metadata" && f.data_type() == &DataType::BinaryView);
        let has_value = fields
            .iter()
            .any(|f| f.name() == "value" && f.data_type() == &DataType::BinaryView);
        has_metadata && has_value
    } else {
        false
    }
}

/// Re-wrap a StructArray so its field nullability matches [`variant_fields`].
///
/// This is a zero-copy operation — only the schema metadata changes, not the
/// underlying buffers.
pub fn normalize_variant_struct(struct_arr: StructArray) -> StructArray {
    let expected = variant_fields();
    if struct_arr.fields() == &expected {
        return struct_arr;
    }
    StructArray::new(
        expected,
        struct_arr.columns().to_vec(),
        struct_arr.nulls().cloned(),
    )
}

/// Extract the string value from a scalar, accepting all string types (Utf8, Utf8View, LargeUtf8).
///
/// Returns `Ok(None)` for null strings, `Ok(Some(s))` for non-null strings,
/// and `Err` for non-string types.
pub(crate) fn scalar_to_string(
    scalar: &datafusion::common::ScalarValue,
) -> datafusion::common::Result<Option<String>> {
    use datafusion::common::ScalarValue;
    match scalar {
        ScalarValue::Utf8(s) | ScalarValue::Utf8View(s) | ScalarValue::LargeUtf8(s) => {
            Ok(s.clone())
        }
        other => datafusion::common::exec_err!("expected string scalar, got {}", other.data_type()),
    }
}

// Any value aggregate.
pub use any_value::any_value_udaf;

// Array cast for complex element transformations.
pub use array_cast::{array_cast_udf, ArrayCastUdf, CastDescriptor};

// Array helpers.
pub use array_helpers::array_avg_udf;
pub use array_helpers::array_sum_udf;

// Fast array length (offsets-based, replaces DataFusion's built-in).
pub use array_length::hamelin_array_length_udf;

// Vectorized variant field extraction over list arrays.
pub use array_variant_get::array_variant_get_udf;

// Map aggregate functions.
pub use map_agg::map_agg_udaf;
pub use multimap_agg::multimap_agg_udaf;

// Map scalar functions.
pub use map_from_entries::map_from_entries_udf;

// Sliding window aggregate functions.
pub use sliding_array_agg::sliding_array_agg_udaf;

// Interval helpers.
pub use interval_helper::from_millis_udf;
pub use interval_helper::from_months_udf;
pub use interval_helper::from_nanos_udf;
pub use interval_helper::to_millis_udf;
pub use interval_helper::to_months_udf;
pub use interval_helper::to_nanos_udf;

// IP Address functions.
pub use ip_address::cidr_contains_udf;
pub use ip_address::is_ipv4_udf;
pub use ip_address::is_ipv6_udf;

// Variant to JSON.
pub use variant_to_json::variant_to_json_udf;

// Variant/JSON functions.
pub use from_variant::{from_variant_udf, FromVariantUdf};
pub use json_to_variant::json_to_variant_udf;
pub use to_variant::cast_to_variant_udf;
pub use variant_get::variant_get_udf;

// Width bucket.
pub use width_bucket::width_bucket_array_udf;

// Timestamp parsing.
pub use parse_timestamp::parse_timestamp_udf;

// Regex functions.
pub use regexp_extract_all::regexp_extract_all_udf;
pub use regexp_split::regexp_split_udf;

// UUID5 (RFC 4122).
pub use uuid5::uuid5_udf;

#[cfg(test)]
mod large_list_tests;
#[cfg(test)]
mod string_type_tests;