use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion::logical_expr::ColumnarValue;
use datafusion::physical_expr::PhysicalExpr;
use datafusion_common::plan_err;
use datafusion_expr::function::AccumulatorArgs;
use datafusion_expr::{
Accumulator, AggregateUDFImpl, LimitEffect, PartitionEvaluator, ScalarFunctionArgs,
ScalarUDFImpl, Signature, Volatility, WindowUDFImpl,
};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use std::any::Any;
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
mod roundtrip_logical_plan;
mod roundtrip_physical_plan;
mod serialize;
#[derive(Debug, PartialEq, Eq, Hash)]
struct MyRegexUdf {
signature: Signature,
pattern: String,
aliases: Vec<String>,
}
impl MyRegexUdf {
fn new(pattern: String) -> Self {
let signature = Signature::exact(vec![DataType::Utf8], Volatility::Immutable);
Self {
signature,
pattern,
aliases: vec!["aggregate_udf_alias".to_string()],
}
}
}
impl ScalarUDFImpl for MyRegexUdf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"regex_udf"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, args: &[DataType]) -> datafusion_common::Result<DataType> {
if matches!(args, [DataType::Utf8]) {
Ok(DataType::Int64)
} else {
plan_err!("regex_udf only accepts Utf8 arguments")
}
}
fn invoke_with_args(
&self,
_args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
panic!("dummy - not implemented")
}
fn aliases(&self) -> &[String] {
&self.aliases
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MyRegexUdfNode {
#[prost(string, tag = "1")]
pub pattern: String,
}
#[derive(Debug, PartialEq, Eq, Hash)]
struct MyAggregateUDF {
signature: Signature,
result: String,
}
impl MyAggregateUDF {
fn new(result: String) -> Self {
let signature = Signature::exact(vec![DataType::Int64], Volatility::Immutable);
Self { signature, result }
}
}
impl AggregateUDFImpl for MyAggregateUDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"aggregate_udf"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(
&self,
_arg_types: &[DataType],
) -> datafusion_common::Result<DataType> {
Ok(DataType::Utf8)
}
fn accumulator(
&self,
_acc_args: AccumulatorArgs,
) -> datafusion_common::Result<Box<dyn Accumulator>> {
unimplemented!()
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MyAggregateUdfNode {
#[prost(string, tag = "1")]
pub result: String,
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub(in crate::cases) struct CustomUDWF {
signature: Signature,
payload: String,
}
impl CustomUDWF {
pub fn new(payload: String) -> Self {
Self {
signature: Signature::exact(vec![DataType::Int64], Volatility::Immutable),
payload,
}
}
}
impl WindowUDFImpl for CustomUDWF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"custom_udwf"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn partition_evaluator(
&self,
_partition_evaluator_args: PartitionEvaluatorArgs,
) -> datafusion_common::Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CustomUDWFEvaluator {}))
}
fn field(
&self,
field_args: WindowUDFFieldArgs,
) -> datafusion_common::Result<FieldRef> {
Ok(Field::new(field_args.name(), DataType::UInt64, false).into())
}
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}
#[derive(Debug)]
struct CustomUDWFEvaluator;
impl PartitionEvaluator for CustomUDWFEvaluator {}
#[derive(Clone, PartialEq, ::prost::Message)]
pub(in crate::cases) struct CustomUDWFNode {
#[prost(string, tag = "1")]
pub payload: String,
}