datafusion_expr/
registry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! FunctionRegistry trait
19
20use crate::expr_rewriter::FunctionRewrite;
21use crate::planner::ExprPlanner;
22use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
23use datafusion_common::{not_impl_err, plan_datafusion_err, HashMap, Result};
24use std::collections::HashSet;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28/// A registry knows how to build logical expressions out of user-defined function' names
29pub trait FunctionRegistry {
30    /// Returns names of all available scalar user defined functions.
31    fn udfs(&self) -> HashSet<String>;
32
33    /// Returns names of all available aggregate user defined functions.
34    fn udafs(&self) -> HashSet<String>;
35
36    /// Returns names of all available window user defined functions.
37    fn udwfs(&self) -> HashSet<String>;
38
39    /// Returns a reference to the user defined scalar function (udf) named
40    /// `name`.
41    fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>>;
42
43    /// Returns a reference to the user defined aggregate function (udaf) named
44    /// `name`.
45    fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>>;
46
47    /// Returns a reference to the user defined window function (udwf) named
48    /// `name`.
49    fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>>;
50
51    /// Registers a new [`ScalarUDF`], returning any previously registered
52    /// implementation.
53    ///
54    /// Returns an error (the default) if the function can not be registered,
55    /// for example if the registry is read only.
56    fn register_udf(&mut self, _udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
57        not_impl_err!("Registering ScalarUDF")
58    }
59    /// Registers a new [`AggregateUDF`], returning any previously registered
60    /// implementation.
61    ///
62    /// Returns an error (the default) if the function can not be registered,
63    /// for example if the registry is read only.
64    fn register_udaf(
65        &mut self,
66        _udaf: Arc<AggregateUDF>,
67    ) -> Result<Option<Arc<AggregateUDF>>> {
68        not_impl_err!("Registering AggregateUDF")
69    }
70    /// Registers a new [`WindowUDF`], returning any previously registered
71    /// implementation.
72    ///
73    /// Returns an error (the default) if the function can not be registered,
74    /// for example if the registry is read only.
75    fn register_udwf(&mut self, _udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
76        not_impl_err!("Registering WindowUDF")
77    }
78
79    /// Deregisters a [`ScalarUDF`], returning the implementation that was
80    /// deregistered.
81    ///
82    /// Returns an error (the default) if the function can not be deregistered,
83    /// for example if the registry is read only.
84    fn deregister_udf(&mut self, _name: &str) -> Result<Option<Arc<ScalarUDF>>> {
85        not_impl_err!("Deregistering ScalarUDF")
86    }
87
88    /// Deregisters a [`AggregateUDF`], returning the implementation that was
89    /// deregistered.
90    ///
91    /// Returns an error (the default) if the function can not be deregistered,
92    /// for example if the registry is read only.
93    fn deregister_udaf(&mut self, _name: &str) -> Result<Option<Arc<AggregateUDF>>> {
94        not_impl_err!("Deregistering AggregateUDF")
95    }
96
97    /// Deregisters a [`WindowUDF`], returning the implementation that was
98    /// deregistered.
99    ///
100    /// Returns an error (the default) if the function can not be deregistered,
101    /// for example if the registry is read only.
102    fn deregister_udwf(&mut self, _name: &str) -> Result<Option<Arc<WindowUDF>>> {
103        not_impl_err!("Deregistering WindowUDF")
104    }
105
106    /// Registers a new [`FunctionRewrite`] with the registry.
107    ///
108    /// `FunctionRewrite` rules are used to rewrite certain / operators in the
109    /// logical plan to function calls.  For example `a || b` might be written to
110    /// `array_concat(a, b)`.
111    ///
112    /// This allows the behavior of operators to be customized by the user.
113    fn register_function_rewrite(
114        &mut self,
115        _rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
116    ) -> Result<()> {
117        not_impl_err!("Registering FunctionRewrite")
118    }
119
120    /// Set of all registered [`ExprPlanner`]s
121    fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>>;
122
123    /// Registers a new [`ExprPlanner`] with the registry.
124    fn register_expr_planner(
125        &mut self,
126        _expr_planner: Arc<dyn ExprPlanner>,
127    ) -> Result<()> {
128        not_impl_err!("Registering ExprPlanner")
129    }
130}
131
132/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode].
133pub trait SerializerRegistry: Debug + Send + Sync {
134    /// Serialize this node to a byte array. This serialization should not include
135    /// input plans.
136    fn serialize_logical_plan(
137        &self,
138        node: &dyn UserDefinedLogicalNode,
139    ) -> Result<Vec<u8>>;
140
141    /// Deserialize user defined logical plan node ([UserDefinedLogicalNode]) from
142    /// bytes.
143    fn deserialize_logical_plan(
144        &self,
145        name: &str,
146        bytes: &[u8],
147    ) -> Result<Arc<dyn UserDefinedLogicalNode>>;
148}
149
150/// A  [`FunctionRegistry`] that uses in memory [`HashMap`]s
151#[derive(Default, Debug)]
152pub struct MemoryFunctionRegistry {
153    /// Scalar Functions
154    udfs: HashMap<String, Arc<ScalarUDF>>,
155    /// Aggregate Functions
156    udafs: HashMap<String, Arc<AggregateUDF>>,
157    /// Window Functions
158    udwfs: HashMap<String, Arc<WindowUDF>>,
159}
160
161impl MemoryFunctionRegistry {
162    pub fn new() -> Self {
163        Self::default()
164    }
165}
166
167impl FunctionRegistry for MemoryFunctionRegistry {
168    fn udfs(&self) -> HashSet<String> {
169        self.udfs.keys().cloned().collect()
170    }
171
172    fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
173        self.udfs
174            .get(name)
175            .cloned()
176            .ok_or_else(|| plan_datafusion_err!("Function {name} not found"))
177    }
178
179    fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
180        self.udafs
181            .get(name)
182            .cloned()
183            .ok_or_else(|| plan_datafusion_err!("Aggregate Function {name} not found"))
184    }
185
186    fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
187        self.udwfs
188            .get(name)
189            .cloned()
190            .ok_or_else(|| plan_datafusion_err!("Window Function {name} not found"))
191    }
192
193    fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
194        Ok(self.udfs.insert(udf.name().to_string(), udf))
195    }
196    fn register_udaf(
197        &mut self,
198        udaf: Arc<AggregateUDF>,
199    ) -> Result<Option<Arc<AggregateUDF>>> {
200        Ok(self.udafs.insert(udaf.name().into(), udaf))
201    }
202    fn register_udwf(&mut self, udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
203        Ok(self.udwfs.insert(udaf.name().into(), udaf))
204    }
205
206    fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
207        vec![]
208    }
209
210    fn udafs(&self) -> HashSet<String> {
211        self.udafs.keys().cloned().collect()
212    }
213
214    fn udwfs(&self) -> HashSet<String> {
215        self.udwfs.keys().cloned().collect()
216    }
217}