use crate::{PhysicalExpr, ScalarFunctionExpr};
use arrow::datatypes::Schema;
use datafusion_common::Result;
pub use datafusion_expr::ScalarUDF;
use std::sync::Arc;
pub fn create_physical_expr(
fun: &ScalarUDF,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
let input_exprs_types = input_phy_exprs
.iter()
.map(|e| e.data_type(input_schema))
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(ScalarFunctionExpr::new(
fun.name(),
fun.fun(),
input_phy_exprs.to_vec(),
fun.return_type(&input_exprs_types)?,
fun.monotonicity()?,
)))
}
#[cfg(test)]
mod tests {
use arrow::datatypes::Schema;
use arrow_schema::DataType;
use datafusion_common::Result;
use datafusion_expr::{
ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
};
use crate::ScalarFunctionExpr;
use super::create_physical_expr;
#[test]
fn test_functions() -> Result<()> {
#[derive(Debug, Clone)]
struct TestScalarUDF {
signature: Signature,
}
impl TestScalarUDF {
fn new() -> Self {
let signature =
Signature::exact(vec![DataType::Float64], Volatility::Immutable);
Self { signature }
}
}
impl ScalarUDFImpl for TestScalarUDF {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
"my_fn"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Float64)
}
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
unimplemented!("my_fn is not implemented")
}
fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
Ok(Some(vec![Some(true)]))
}
}
let udf = ScalarUDF::from(TestScalarUDF::new());
let p_expr = create_physical_expr(&udf, &[], &Schema::empty())?;
assert_eq!(
p_expr
.as_any()
.downcast_ref::<ScalarFunctionExpr>()
.unwrap()
.monotonicity(),
&Some(vec![Some(true)])
);
Ok(())
}
}