datafusion_expr/
async_udf.rs1use crate::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl};
19use arrow::array::ArrayRef;
20use arrow::datatypes::{DataType, FieldRef};
21use async_trait::async_trait;
22use datafusion_common::config::ConfigOptions;
23use datafusion_common::error::Result;
24use datafusion_common::internal_err;
25use datafusion_expr_common::columnar_value::ColumnarValue;
26use datafusion_expr_common::signature::Signature;
27use std::any::Any;
28use std::fmt::{Debug, Display};
29use std::hash::{DefaultHasher, Hash, Hasher};
30use std::sync::Arc;
31
32#[async_trait]
39pub trait AsyncScalarUDFImpl: ScalarUDFImpl {
40 fn ideal_batch_size(&self) -> Option<usize> {
45 None
46 }
47
48 async fn invoke_async_with_args(
50 &self,
51 args: ScalarFunctionArgs,
52 option: &ConfigOptions,
53 ) -> Result<ArrayRef>;
54}
55
56#[derive(Debug)]
61pub struct AsyncScalarUDF {
62 inner: Arc<dyn AsyncScalarUDFImpl>,
63}
64
65impl AsyncScalarUDF {
66 pub fn new(inner: Arc<dyn AsyncScalarUDFImpl>) -> Self {
67 Self { inner }
68 }
69
70 pub fn ideal_batch_size(&self) -> Option<usize> {
72 self.inner.ideal_batch_size()
73 }
74
75 pub fn into_scalar_udf(self) -> ScalarUDF {
78 ScalarUDF::new_from_impl(self)
79 }
80
81 pub async fn invoke_async_with_args(
83 &self,
84 args: ScalarFunctionArgs,
85 option: &ConfigOptions,
86 ) -> Result<ArrayRef> {
87 self.inner.invoke_async_with_args(args, option).await
88 }
89}
90
91impl ScalarUDFImpl for AsyncScalarUDF {
92 fn as_any(&self) -> &dyn Any {
93 self
94 }
95
96 fn name(&self) -> &str {
97 self.inner.name()
98 }
99
100 fn signature(&self) -> &Signature {
101 self.inner.signature()
102 }
103
104 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
105 self.inner.return_type(arg_types)
106 }
107
108 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
109 self.inner.return_field_from_args(args)
110 }
111
112 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
113 internal_err!("async functions should not be called directly")
114 }
115
116 fn equals(&self, other: &dyn ScalarUDFImpl) -> bool {
117 let Some(other) = other.as_any().downcast_ref::<Self>() else {
118 return false;
119 };
120 let Self { inner } = self;
121 Arc::ptr_eq(inner, &other.inner)
123 }
124
125 fn hash_value(&self) -> u64 {
126 let Self { inner } = self;
127 let mut hasher = DefaultHasher::new();
128 Arc::as_ptr(inner).hash(&mut hasher);
129 hasher.finish()
130 }
131}
132
133impl Display for AsyncScalarUDF {
134 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
135 write!(f, "AsyncScalarUDF: {}", self.inner.name())
136 }
137}