datafusion_expr/registry.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! FunctionRegistry trait
19
20use crate::expr_rewriter::FunctionRewrite;
21use crate::planner::ExprPlanner;
22use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
23use datafusion_common::{not_impl_err, plan_datafusion_err, HashMap, Result};
24use std::collections::HashSet;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28/// A registry knows how to build logical expressions out of user-defined function' names
29pub trait FunctionRegistry {
30 /// Returns names of all available scalar user defined functions.
31 fn udfs(&self) -> HashSet<String>;
32
33 /// Returns names of all available aggregate user defined functions.
34 fn udafs(&self) -> HashSet<String> {
35 // This default implementation is provided temporarily
36 // to maintain backward compatibility for the 50.1 release.
37 // It will be reverted to a required method in future versions.
38 HashSet::default()
39 }
40
41 /// Returns names of all available window user defined functions.
42 fn udwfs(&self) -> HashSet<String> {
43 // This default implementation is provided temporarily
44 // to maintain backward compatibility for the 50.1 release.
45 // It will be reverted to a required method in future versions.
46 HashSet::default()
47 }
48
49 /// Returns a reference to the user defined scalar function (udf) named
50 /// `name`.
51 fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>>;
52
53 /// Returns a reference to the user defined aggregate function (udaf) named
54 /// `name`.
55 fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>>;
56
57 /// Returns a reference to the user defined window function (udwf) named
58 /// `name`.
59 fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>>;
60
61 /// Registers a new [`ScalarUDF`], returning any previously registered
62 /// implementation.
63 ///
64 /// Returns an error (the default) if the function can not be registered,
65 /// for example if the registry is read only.
66 fn register_udf(&mut self, _udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
67 not_impl_err!("Registering ScalarUDF")
68 }
69 /// Registers a new [`AggregateUDF`], returning any previously registered
70 /// implementation.
71 ///
72 /// Returns an error (the default) if the function can not be registered,
73 /// for example if the registry is read only.
74 fn register_udaf(
75 &mut self,
76 _udaf: Arc<AggregateUDF>,
77 ) -> Result<Option<Arc<AggregateUDF>>> {
78 not_impl_err!("Registering AggregateUDF")
79 }
80 /// Registers a new [`WindowUDF`], returning any previously registered
81 /// implementation.
82 ///
83 /// Returns an error (the default) if the function can not be registered,
84 /// for example if the registry is read only.
85 fn register_udwf(&mut self, _udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
86 not_impl_err!("Registering WindowUDF")
87 }
88
89 /// Deregisters a [`ScalarUDF`], returning the implementation that was
90 /// deregistered.
91 ///
92 /// Returns an error (the default) if the function can not be deregistered,
93 /// for example if the registry is read only.
94 fn deregister_udf(&mut self, _name: &str) -> Result<Option<Arc<ScalarUDF>>> {
95 not_impl_err!("Deregistering ScalarUDF")
96 }
97
98 /// Deregisters a [`AggregateUDF`], returning the implementation that was
99 /// deregistered.
100 ///
101 /// Returns an error (the default) if the function can not be deregistered,
102 /// for example if the registry is read only.
103 fn deregister_udaf(&mut self, _name: &str) -> Result<Option<Arc<AggregateUDF>>> {
104 not_impl_err!("Deregistering AggregateUDF")
105 }
106
107 /// Deregisters a [`WindowUDF`], returning the implementation that was
108 /// deregistered.
109 ///
110 /// Returns an error (the default) if the function can not be deregistered,
111 /// for example if the registry is read only.
112 fn deregister_udwf(&mut self, _name: &str) -> Result<Option<Arc<WindowUDF>>> {
113 not_impl_err!("Deregistering WindowUDF")
114 }
115
116 /// Registers a new [`FunctionRewrite`] with the registry.
117 ///
118 /// `FunctionRewrite` rules are used to rewrite certain / operators in the
119 /// logical plan to function calls. For example `a || b` might be written to
120 /// `array_concat(a, b)`.
121 ///
122 /// This allows the behavior of operators to be customized by the user.
123 fn register_function_rewrite(
124 &mut self,
125 _rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
126 ) -> Result<()> {
127 not_impl_err!("Registering FunctionRewrite")
128 }
129
130 /// Set of all registered [`ExprPlanner`]s
131 fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>>;
132
133 /// Registers a new [`ExprPlanner`] with the registry.
134 fn register_expr_planner(
135 &mut self,
136 _expr_planner: Arc<dyn ExprPlanner>,
137 ) -> Result<()> {
138 not_impl_err!("Registering ExprPlanner")
139 }
140}
141
142/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode].
143pub trait SerializerRegistry: Debug + Send + Sync {
144 /// Serialize this node to a byte array. This serialization should not include
145 /// input plans.
146 fn serialize_logical_plan(
147 &self,
148 node: &dyn UserDefinedLogicalNode,
149 ) -> Result<Vec<u8>>;
150
151 /// Deserialize user defined logical plan node ([UserDefinedLogicalNode]) from
152 /// bytes.
153 fn deserialize_logical_plan(
154 &self,
155 name: &str,
156 bytes: &[u8],
157 ) -> Result<Arc<dyn UserDefinedLogicalNode>>;
158}
159
160/// A [`FunctionRegistry`] that uses in memory [`HashMap`]s
161#[derive(Default, Debug)]
162pub struct MemoryFunctionRegistry {
163 /// Scalar Functions
164 udfs: HashMap<String, Arc<ScalarUDF>>,
165 /// Aggregate Functions
166 udafs: HashMap<String, Arc<AggregateUDF>>,
167 /// Window Functions
168 udwfs: HashMap<String, Arc<WindowUDF>>,
169}
170
171impl MemoryFunctionRegistry {
172 pub fn new() -> Self {
173 Self::default()
174 }
175}
176
177impl FunctionRegistry for MemoryFunctionRegistry {
178 fn udfs(&self) -> HashSet<String> {
179 self.udfs.keys().cloned().collect()
180 }
181
182 fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
183 self.udfs
184 .get(name)
185 .cloned()
186 .ok_or_else(|| plan_datafusion_err!("Function {name} not found"))
187 }
188
189 fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
190 self.udafs
191 .get(name)
192 .cloned()
193 .ok_or_else(|| plan_datafusion_err!("Aggregate Function {name} not found"))
194 }
195
196 fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
197 self.udwfs
198 .get(name)
199 .cloned()
200 .ok_or_else(|| plan_datafusion_err!("Window Function {name} not found"))
201 }
202
203 fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
204 Ok(self.udfs.insert(udf.name().to_string(), udf))
205 }
206 fn register_udaf(
207 &mut self,
208 udaf: Arc<AggregateUDF>,
209 ) -> Result<Option<Arc<AggregateUDF>>> {
210 Ok(self.udafs.insert(udaf.name().into(), udaf))
211 }
212 fn register_udwf(&mut self, udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
213 Ok(self.udwfs.insert(udaf.name().into(), udaf))
214 }
215
216 fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
217 vec![]
218 }
219
220 fn udafs(&self) -> HashSet<String> {
221 self.udafs.keys().cloned().collect()
222 }
223
224 fn udwfs(&self) -> HashSet<String> {
225 self.udwfs.keys().cloned().collect()
226 }
227}