datafusion_expr/registry.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! FunctionRegistry trait
19
20use crate::expr_rewriter::FunctionRewrite;
21use crate::planner::ExprPlanner;
22use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
23use datafusion_common::{not_impl_err, plan_datafusion_err, HashMap, Result};
24use std::collections::HashSet;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28/// A registry knows how to build logical expressions out of user-defined function' names
29pub trait FunctionRegistry {
30 /// Returns names of all available scalar user defined functions.
31 fn udfs(&self) -> HashSet<String>;
32
33 /// Returns names of all available aggregate user defined functions.
34 fn udafs(&self) -> HashSet<String>;
35
36 /// Returns names of all available window user defined functions.
37 fn udwfs(&self) -> HashSet<String>;
38
39 /// Returns a reference to the user defined scalar function (udf) named
40 /// `name`.
41 fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>>;
42
43 /// Returns a reference to the user defined aggregate function (udaf) named
44 /// `name`.
45 fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>>;
46
47 /// Returns a reference to the user defined window function (udwf) named
48 /// `name`.
49 fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>>;
50
51 /// Registers a new [`ScalarUDF`], returning any previously registered
52 /// implementation.
53 ///
54 /// Returns an error (the default) if the function can not be registered,
55 /// for example if the registry is read only.
56 fn register_udf(&mut self, _udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
57 not_impl_err!("Registering ScalarUDF")
58 }
59 /// Registers a new [`AggregateUDF`], returning any previously registered
60 /// implementation.
61 ///
62 /// Returns an error (the default) if the function can not be registered,
63 /// for example if the registry is read only.
64 fn register_udaf(
65 &mut self,
66 _udaf: Arc<AggregateUDF>,
67 ) -> Result<Option<Arc<AggregateUDF>>> {
68 not_impl_err!("Registering AggregateUDF")
69 }
70 /// Registers a new [`WindowUDF`], returning any previously registered
71 /// implementation.
72 ///
73 /// Returns an error (the default) if the function can not be registered,
74 /// for example if the registry is read only.
75 fn register_udwf(&mut self, _udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
76 not_impl_err!("Registering WindowUDF")
77 }
78
79 /// Deregisters a [`ScalarUDF`], returning the implementation that was
80 /// deregistered.
81 ///
82 /// Returns an error (the default) if the function can not be deregistered,
83 /// for example if the registry is read only.
84 fn deregister_udf(&mut self, _name: &str) -> Result<Option<Arc<ScalarUDF>>> {
85 not_impl_err!("Deregistering ScalarUDF")
86 }
87
88 /// Deregisters a [`AggregateUDF`], returning the implementation that was
89 /// deregistered.
90 ///
91 /// Returns an error (the default) if the function can not be deregistered,
92 /// for example if the registry is read only.
93 fn deregister_udaf(&mut self, _name: &str) -> Result<Option<Arc<AggregateUDF>>> {
94 not_impl_err!("Deregistering AggregateUDF")
95 }
96
97 /// Deregisters a [`WindowUDF`], returning the implementation that was
98 /// deregistered.
99 ///
100 /// Returns an error (the default) if the function can not be deregistered,
101 /// for example if the registry is read only.
102 fn deregister_udwf(&mut self, _name: &str) -> Result<Option<Arc<WindowUDF>>> {
103 not_impl_err!("Deregistering WindowUDF")
104 }
105
106 /// Registers a new [`FunctionRewrite`] with the registry.
107 ///
108 /// `FunctionRewrite` rules are used to rewrite certain / operators in the
109 /// logical plan to function calls. For example `a || b` might be written to
110 /// `array_concat(a, b)`.
111 ///
112 /// This allows the behavior of operators to be customized by the user.
113 fn register_function_rewrite(
114 &mut self,
115 _rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
116 ) -> Result<()> {
117 not_impl_err!("Registering FunctionRewrite")
118 }
119
120 /// Set of all registered [`ExprPlanner`]s
121 fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>>;
122
123 /// Registers a new [`ExprPlanner`] with the registry.
124 fn register_expr_planner(
125 &mut self,
126 _expr_planner: Arc<dyn ExprPlanner>,
127 ) -> Result<()> {
128 not_impl_err!("Registering ExprPlanner")
129 }
130}
131
132/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode].
133pub trait SerializerRegistry: Debug + Send + Sync {
134 /// Serialize this node to a byte array. This serialization should not include
135 /// input plans.
136 fn serialize_logical_plan(
137 &self,
138 node: &dyn UserDefinedLogicalNode,
139 ) -> Result<Vec<u8>>;
140
141 /// Deserialize user defined logical plan node ([UserDefinedLogicalNode]) from
142 /// bytes.
143 fn deserialize_logical_plan(
144 &self,
145 name: &str,
146 bytes: &[u8],
147 ) -> Result<Arc<dyn UserDefinedLogicalNode>>;
148}
149
150/// A [`FunctionRegistry`] that uses in memory [`HashMap`]s
151#[derive(Default, Debug)]
152pub struct MemoryFunctionRegistry {
153 /// Scalar Functions
154 udfs: HashMap<String, Arc<ScalarUDF>>,
155 /// Aggregate Functions
156 udafs: HashMap<String, Arc<AggregateUDF>>,
157 /// Window Functions
158 udwfs: HashMap<String, Arc<WindowUDF>>,
159}
160
161impl MemoryFunctionRegistry {
162 pub fn new() -> Self {
163 Self::default()
164 }
165}
166
167impl FunctionRegistry for MemoryFunctionRegistry {
168 fn udfs(&self) -> HashSet<String> {
169 self.udfs.keys().cloned().collect()
170 }
171
172 fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
173 self.udfs
174 .get(name)
175 .cloned()
176 .ok_or_else(|| plan_datafusion_err!("Function {name} not found"))
177 }
178
179 fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
180 self.udafs
181 .get(name)
182 .cloned()
183 .ok_or_else(|| plan_datafusion_err!("Aggregate Function {name} not found"))
184 }
185
186 fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
187 self.udwfs
188 .get(name)
189 .cloned()
190 .ok_or_else(|| plan_datafusion_err!("Window Function {name} not found"))
191 }
192
193 fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
194 Ok(self.udfs.insert(udf.name().to_string(), udf))
195 }
196 fn register_udaf(
197 &mut self,
198 udaf: Arc<AggregateUDF>,
199 ) -> Result<Option<Arc<AggregateUDF>>> {
200 Ok(self.udafs.insert(udaf.name().into(), udaf))
201 }
202 fn register_udwf(&mut self, udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
203 Ok(self.udwfs.insert(udaf.name().into(), udaf))
204 }
205
206 fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
207 vec![]
208 }
209
210 fn udafs(&self) -> HashSet<String> {
211 self.udafs.keys().cloned().collect()
212 }
213
214 fn udwfs(&self) -> HashSet<String> {
215 self.udwfs.keys().cloned().collect()
216 }
217}