use std::sync::Arc;
use arrow::array::{Array, ArrayRef};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation};
pub use crate::scalar_function::create_physical_expr;
pub use datafusion_expr::function::Hint;
#[deprecated(since = "36.0.0", note = "Use ColumarValue::values_to_arrays instead")]
pub fn columnar_values_to_array(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
ColumnarValue::values_to_arrays(args)
}
#[deprecated(
since = "36.0.0",
note = "Implement your function directly in terms of ColumnarValue or use `ScalarUDF` instead"
)]
pub fn make_scalar_function<F>(inner: F) -> ScalarFunctionImplementation
where
F: Fn(&[ArrayRef]) -> Result<ArrayRef> + Sync + Send + 'static,
{
make_scalar_function_inner(inner)
}
pub(crate) fn make_scalar_function_inner<F>(inner: F) -> ScalarFunctionImplementation
where
F: Fn(&[ArrayRef]) -> Result<ArrayRef> + Sync + Send + 'static,
{
make_scalar_function_with_hints(inner, vec![])
}
pub(crate) fn make_scalar_function_with_hints<F>(
inner: F,
hints: Vec<Hint>,
) -> ScalarFunctionImplementation
where
F: Fn(&[ArrayRef]) -> Result<ArrayRef> + Sync + Send + 'static,
{
Arc::new(move |args: &[ColumnarValue]| {
let len = args
.iter()
.fold(Option::<usize>::None, |acc, arg| match arg {
ColumnarValue::Scalar(_) => acc,
ColumnarValue::Array(a) => Some(a.len()),
});
let is_scalar = len.is_none();
let inferred_length = len.unwrap_or(1);
let args = args
.iter()
.zip(hints.iter().chain(std::iter::repeat(&Hint::Pad)))
.map(|(arg, hint)| {
let expansion_len = match hint {
Hint::AcceptsSingular => 1,
Hint::Pad => inferred_length,
};
arg.clone().into_array(expansion_len)
})
.collect::<Result<Vec<_>>>()?;
let result = (inner)(&args);
if is_scalar {
let result = result.and_then(|arr| ScalarValue::try_from_array(&arr, 0));
result.map(ColumnarValue::Scalar)
} else {
result.map(ColumnarValue::Array)
}
})
}