hamelin_datafusion 0.7.5

Translate Hamelin TypedAST to DataFusion LogicalPlans
Documentation
//! RFC 4122 UUID Version 5 UDF for DataFusion.
//!
//! Deterministic name-based UUIDs using the `uuid` crate so Trino and DataFusion match.

use std::any::Any;
use std::sync::Arc;

use datafusion::arrow::array::{Array, ArrayRef, AsArray, StringArray};
use datafusion::arrow::datatypes::DataType;
use datafusion::common::{exec_err, Result, ScalarValue};
use datafusion::logical_expr::{
    ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature,
    Volatility,
};
use uuid::Uuid;

use super::string_utils::{scalar_to_str, STRING_TYPES};

#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Uuid5Udf {
    signature: Signature,
}

impl Default for Uuid5Udf {
    fn default() -> Self {
        Self::new()
    }
}

impl Uuid5Udf {
    pub fn new() -> Self {
        let sigs: Vec<TypeSignature> = STRING_TYPES
            .iter()
            .map(|t| TypeSignature::Exact(vec![t.clone()]))
            .collect();
        Self {
            signature: Signature::new(TypeSignature::OneOf(sigs), Volatility::Immutable),
        }
    }
}

fn uuid5(name: &str) -> String {
    Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes())
        .hyphenated()
        .to_string()
}

impl ScalarUDFImpl for Uuid5Udf {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn name(&self) -> &str {
        "hamelin_uuid5"
    }

    fn signature(&self) -> &Signature {
        &self.signature
    }

    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
        Ok(DataType::Utf8)
    }

    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
        let args = args.args;
        if args.len() != 1 {
            return exec_err!("uuid5(name) expects 1 argument, got {}", args.len());
        }

        match &args[0] {
            ColumnarValue::Scalar(scalar) => {
                let result = scalar_to_str(scalar)?.map(uuid5);
                Ok(ColumnarValue::Scalar(ScalarValue::Utf8(result)))
            }
            ColumnarValue::Array(array) => {
                let result = match array.data_type() {
                    DataType::Utf8 => map_string_to_uuid5_array(array.as_string::<i32>(), uuid5),
                    DataType::LargeUtf8 => {
                        map_string_to_uuid5_array(array.as_string::<i64>(), uuid5)
                    }
                    DataType::Utf8View => map_string_to_uuid5_array(array.as_string_view(), uuid5),
                    other => return exec_err!("uuid5(name) expects string array, got {}", other),
                };
                Ok(ColumnarValue::Array(result))
            }
        }
    }
}

/// Map a string array through a function that takes &str and returns String.
fn map_string_to_uuid5_array<T, F>(array: &T, f: F) -> ArrayRef
where
    T: Array + 'static,
    for<'a> &'a T: IntoIterator<Item = Option<&'a str>>,
    F: Fn(&str) -> String,
{
    let result: StringArray = array.into_iter().map(|opt| opt.map(&f)).collect();
    Arc::new(result)
}

pub fn uuid5_udf() -> ScalarUDF {
    ScalarUDF::new_from_impl(Uuid5Udf::new())
}