datafusion_expr/
registry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! FunctionRegistry trait
19
20use crate::expr_rewriter::FunctionRewrite;
21use crate::planner::ExprPlanner;
22use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
23use datafusion_common::{not_impl_err, plan_datafusion_err, HashMap, Result};
24use std::collections::HashSet;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28/// A registry knows how to build logical expressions out of user-defined function' names
29pub trait FunctionRegistry {
30    /// Returns names of all available scalar user defined functions.
31    fn udfs(&self) -> HashSet<String>;
32
33    /// Returns names of all available aggregate user defined functions.
34    fn udafs(&self) -> HashSet<String> {
35        // This default implementation is provided temporarily
36        // to maintain backward compatibility for the 50.1 release.
37        // It will be reverted to a required method in future versions.
38        HashSet::default()
39    }
40
41    /// Returns names of all available window user defined functions.
42    fn udwfs(&self) -> HashSet<String> {
43        // This default implementation is provided temporarily
44        // to maintain backward compatibility for the 50.1 release.
45        // It will be reverted to a required method in future versions.
46        HashSet::default()
47    }
48
49    /// Returns a reference to the user defined scalar function (udf) named
50    /// `name`.
51    fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>>;
52
53    /// Returns a reference to the user defined aggregate function (udaf) named
54    /// `name`.
55    fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>>;
56
57    /// Returns a reference to the user defined window function (udwf) named
58    /// `name`.
59    fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>>;
60
61    /// Registers a new [`ScalarUDF`], returning any previously registered
62    /// implementation.
63    ///
64    /// Returns an error (the default) if the function can not be registered,
65    /// for example if the registry is read only.
66    fn register_udf(&mut self, _udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
67        not_impl_err!("Registering ScalarUDF")
68    }
69    /// Registers a new [`AggregateUDF`], returning any previously registered
70    /// implementation.
71    ///
72    /// Returns an error (the default) if the function can not be registered,
73    /// for example if the registry is read only.
74    fn register_udaf(
75        &mut self,
76        _udaf: Arc<AggregateUDF>,
77    ) -> Result<Option<Arc<AggregateUDF>>> {
78        not_impl_err!("Registering AggregateUDF")
79    }
80    /// Registers a new [`WindowUDF`], returning any previously registered
81    /// implementation.
82    ///
83    /// Returns an error (the default) if the function can not be registered,
84    /// for example if the registry is read only.
85    fn register_udwf(&mut self, _udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
86        not_impl_err!("Registering WindowUDF")
87    }
88
89    /// Deregisters a [`ScalarUDF`], returning the implementation that was
90    /// deregistered.
91    ///
92    /// Returns an error (the default) if the function can not be deregistered,
93    /// for example if the registry is read only.
94    fn deregister_udf(&mut self, _name: &str) -> Result<Option<Arc<ScalarUDF>>> {
95        not_impl_err!("Deregistering ScalarUDF")
96    }
97
98    /// Deregisters a [`AggregateUDF`], returning the implementation that was
99    /// deregistered.
100    ///
101    /// Returns an error (the default) if the function can not be deregistered,
102    /// for example if the registry is read only.
103    fn deregister_udaf(&mut self, _name: &str) -> Result<Option<Arc<AggregateUDF>>> {
104        not_impl_err!("Deregistering AggregateUDF")
105    }
106
107    /// Deregisters a [`WindowUDF`], returning the implementation that was
108    /// deregistered.
109    ///
110    /// Returns an error (the default) if the function can not be deregistered,
111    /// for example if the registry is read only.
112    fn deregister_udwf(&mut self, _name: &str) -> Result<Option<Arc<WindowUDF>>> {
113        not_impl_err!("Deregistering WindowUDF")
114    }
115
116    /// Registers a new [`FunctionRewrite`] with the registry.
117    ///
118    /// `FunctionRewrite` rules are used to rewrite certain / operators in the
119    /// logical plan to function calls.  For example `a || b` might be written to
120    /// `array_concat(a, b)`.
121    ///
122    /// This allows the behavior of operators to be customized by the user.
123    fn register_function_rewrite(
124        &mut self,
125        _rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
126    ) -> Result<()> {
127        not_impl_err!("Registering FunctionRewrite")
128    }
129
130    /// Set of all registered [`ExprPlanner`]s
131    fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>>;
132
133    /// Registers a new [`ExprPlanner`] with the registry.
134    fn register_expr_planner(
135        &mut self,
136        _expr_planner: Arc<dyn ExprPlanner>,
137    ) -> Result<()> {
138        not_impl_err!("Registering ExprPlanner")
139    }
140}
141
142/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode].
143pub trait SerializerRegistry: Debug + Send + Sync {
144    /// Serialize this node to a byte array. This serialization should not include
145    /// input plans.
146    fn serialize_logical_plan(
147        &self,
148        node: &dyn UserDefinedLogicalNode,
149    ) -> Result<Vec<u8>>;
150
151    /// Deserialize user defined logical plan node ([UserDefinedLogicalNode]) from
152    /// bytes.
153    fn deserialize_logical_plan(
154        &self,
155        name: &str,
156        bytes: &[u8],
157    ) -> Result<Arc<dyn UserDefinedLogicalNode>>;
158}
159
160/// A  [`FunctionRegistry`] that uses in memory [`HashMap`]s
161#[derive(Default, Debug)]
162pub struct MemoryFunctionRegistry {
163    /// Scalar Functions
164    udfs: HashMap<String, Arc<ScalarUDF>>,
165    /// Aggregate Functions
166    udafs: HashMap<String, Arc<AggregateUDF>>,
167    /// Window Functions
168    udwfs: HashMap<String, Arc<WindowUDF>>,
169}
170
171impl MemoryFunctionRegistry {
172    pub fn new() -> Self {
173        Self::default()
174    }
175}
176
177impl FunctionRegistry for MemoryFunctionRegistry {
178    fn udfs(&self) -> HashSet<String> {
179        self.udfs.keys().cloned().collect()
180    }
181
182    fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
183        self.udfs
184            .get(name)
185            .cloned()
186            .ok_or_else(|| plan_datafusion_err!("Function {name} not found"))
187    }
188
189    fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
190        self.udafs
191            .get(name)
192            .cloned()
193            .ok_or_else(|| plan_datafusion_err!("Aggregate Function {name} not found"))
194    }
195
196    fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
197        self.udwfs
198            .get(name)
199            .cloned()
200            .ok_or_else(|| plan_datafusion_err!("Window Function {name} not found"))
201    }
202
203    fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
204        Ok(self.udfs.insert(udf.name().to_string(), udf))
205    }
206    fn register_udaf(
207        &mut self,
208        udaf: Arc<AggregateUDF>,
209    ) -> Result<Option<Arc<AggregateUDF>>> {
210        Ok(self.udafs.insert(udaf.name().into(), udaf))
211    }
212    fn register_udwf(&mut self, udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
213        Ok(self.udwfs.insert(udaf.name().into(), udaf))
214    }
215
216    fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
217        vec![]
218    }
219
220    fn udafs(&self) -> HashSet<String> {
221        self.udafs.keys().cloned().collect()
222    }
223
224    fn udwfs(&self) -> HashSet<String> {
225        self.udwfs.keys().cloned().collect()
226    }
227}