use std::any::Any;
use std::sync::Arc;
use datafusion::arrow::array::{Array, ArrayRef, AsArray, StringArray};
use datafusion::arrow::datatypes::DataType;
use datafusion::common::{exec_err, Result, ScalarValue};
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature,
Volatility,
};
use uuid::Uuid;
use super::string_utils::{scalar_to_str, STRING_TYPES};
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Uuid5Udf {
signature: Signature,
}
impl Default for Uuid5Udf {
fn default() -> Self {
Self::new()
}
}
impl Uuid5Udf {
pub fn new() -> Self {
let sigs: Vec<TypeSignature> = STRING_TYPES
.iter()
.map(|t| TypeSignature::Exact(vec![t.clone()]))
.collect();
Self {
signature: Signature::new(TypeSignature::OneOf(sigs), Volatility::Immutable),
}
}
}
fn uuid5(name: &str) -> String {
Uuid::new_v5(&Uuid::NAMESPACE_OID, name.as_bytes())
.hyphenated()
.to_string()
}
impl ScalarUDFImpl for Uuid5Udf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"hamelin_uuid5"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = args.args;
if args.len() != 1 {
return exec_err!("uuid5(name) expects 1 argument, got {}", args.len());
}
match &args[0] {
ColumnarValue::Scalar(scalar) => {
let result = scalar_to_str(scalar)?.map(uuid5);
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(result)))
}
ColumnarValue::Array(array) => {
let result = match array.data_type() {
DataType::Utf8 => map_string_to_uuid5_array(array.as_string::<i32>(), uuid5),
DataType::LargeUtf8 => {
map_string_to_uuid5_array(array.as_string::<i64>(), uuid5)
}
DataType::Utf8View => map_string_to_uuid5_array(array.as_string_view(), uuid5),
other => return exec_err!("uuid5(name) expects string array, got {}", other),
};
Ok(ColumnarValue::Array(result))
}
}
}
}
fn map_string_to_uuid5_array<T, F>(array: &T, f: F) -> ArrayRef
where
T: Array + 'static,
for<'a> &'a T: IntoIterator<Item = Option<&'a str>>,
F: Fn(&str) -> String,
{
let result: StringArray = array.into_iter().map(|opt| opt.map(&f)).collect();
Arc::new(result)
}
pub fn uuid5_udf() -> ScalarUDF {
ScalarUDF::new_from_impl(Uuid5Udf::new())
}