1use crate::logical_plan::to_proto::serialize_expr;
20use crate::logical_plan::{
21 self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
22};
23use crate::physical_plan::{
24 AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
25};
26use crate::protobuf;
27use datafusion_common::{plan_datafusion_err, Result};
28use datafusion_execution::TaskContext;
29use datafusion_expr::{
30 create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility,
31 WindowUDF,
32};
33use prost::{
34 bytes::{Bytes, BytesMut},
35 Message,
36};
37use std::sync::Arc;
38
39use datafusion_execution::registry::FunctionRegistry;
41use datafusion_expr::planner::ExprPlanner;
42use datafusion_physical_plan::ExecutionPlan;
43
44mod registry;
45
46pub trait Serializeable: Sized {
64 fn to_bytes(&self) -> Result<Bytes>;
66
67 fn from_bytes(bytes: &[u8]) -> Result<Self> {
75 Self::from_bytes_with_registry(bytes, ®istry::NoRegistry {})
76 }
77
78 fn from_bytes_with_registry(
84 bytes: &[u8],
85 registry: &dyn FunctionRegistry,
86 ) -> Result<Self>;
87}
88
89impl Serializeable for Expr {
90 fn to_bytes(&self) -> Result<Bytes> {
91 let mut buffer = BytesMut::new();
92 let extension_codec = DefaultLogicalExtensionCodec {};
93 let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
94 .map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
95
96 protobuf
97 .encode(&mut buffer)
98 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
99
100 let bytes: Bytes = buffer.into();
101
102 struct PlaceHolderRegistry;
109
110 impl FunctionRegistry for PlaceHolderRegistry {
111 fn udfs(&self) -> std::collections::HashSet<String> {
112 std::collections::HashSet::default()
113 }
114
115 fn udf(&self, name: &str) -> Result<Arc<datafusion_expr::ScalarUDF>> {
116 Ok(Arc::new(create_udf(
117 name,
118 vec![],
119 arrow::datatypes::DataType::Null,
120 Volatility::Immutable,
121 Arc::new(|_| unimplemented!()),
122 )))
123 }
124
125 fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
126 Ok(Arc::new(create_udaf(
127 name,
128 vec![arrow::datatypes::DataType::Null],
129 Arc::new(arrow::datatypes::DataType::Null),
130 Volatility::Immutable,
131 Arc::new(|_| unimplemented!()),
132 Arc::new(vec![]),
133 )))
134 }
135
136 fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
137 Ok(Arc::new(create_udwf(
138 name,
139 arrow::datatypes::DataType::Null,
140 Arc::new(arrow::datatypes::DataType::Null),
141 Volatility::Immutable,
142 Arc::new(|| unimplemented!()),
143 )))
144 }
145 fn register_udaf(
146 &mut self,
147 _udaf: Arc<AggregateUDF>,
148 ) -> Result<Option<Arc<AggregateUDF>>> {
149 datafusion_common::internal_err!(
150 "register_udaf called in Placeholder Registry!"
151 )
152 }
153 fn register_udf(
154 &mut self,
155 _udf: Arc<datafusion_expr::ScalarUDF>,
156 ) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
157 datafusion_common::internal_err!(
158 "register_udf called in Placeholder Registry!"
159 )
160 }
161 fn register_udwf(
162 &mut self,
163 _udaf: Arc<WindowUDF>,
164 ) -> Result<Option<Arc<WindowUDF>>> {
165 datafusion_common::internal_err!(
166 "register_udwf called in Placeholder Registry!"
167 )
168 }
169
170 fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
171 vec![]
172 }
173
174 fn udafs(&self) -> std::collections::HashSet<String> {
175 std::collections::HashSet::default()
176 }
177
178 fn udwfs(&self) -> std::collections::HashSet<String> {
179 std::collections::HashSet::default()
180 }
181 }
182 Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
183
184 Ok(bytes)
185 }
186
187 fn from_bytes_with_registry(
188 bytes: &[u8],
189 registry: &dyn FunctionRegistry,
190 ) -> Result<Self> {
191 let protobuf = protobuf::LogicalExprNode::decode(bytes)
192 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
193
194 let extension_codec = DefaultLogicalExtensionCodec {};
195 logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec)
196 .map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
197 }
198}
199
200pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
202 let extension_codec = DefaultLogicalExtensionCodec {};
203 logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
204}
205
206#[cfg(feature = "json")]
208pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
209 let extension_codec = DefaultLogicalExtensionCodec {};
210 logical_plan_to_json_with_extension_codec(plan, &extension_codec)
211}
212
213pub fn logical_plan_to_bytes_with_extension_codec(
215 plan: &LogicalPlan,
216 extension_codec: &dyn LogicalExtensionCodec,
217) -> Result<Bytes> {
218 let protobuf =
219 protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
220 let mut buffer = BytesMut::new();
221 protobuf
222 .encode(&mut buffer)
223 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
224 Ok(buffer.into())
225}
226
227#[cfg(feature = "json")]
229pub fn logical_plan_to_json_with_extension_codec(
230 plan: &LogicalPlan,
231 extension_codec: &dyn LogicalExtensionCodec,
232) -> Result<String> {
233 let protobuf =
234 protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
235 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
236 serde_json::to_string(&protobuf)
237 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
238}
239
240#[cfg(feature = "json")]
242pub fn logical_plan_from_json(json: &str, ctx: &TaskContext) -> Result<LogicalPlan> {
243 let extension_codec = DefaultLogicalExtensionCodec {};
244 logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
245}
246
247pub fn logical_plan_from_bytes(bytes: &[u8], ctx: &TaskContext) -> Result<LogicalPlan> {
249 let extension_codec = DefaultLogicalExtensionCodec {};
250 logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
251}
252
253pub fn logical_plan_from_bytes_with_extension_codec(
255 bytes: &[u8],
256 ctx: &TaskContext,
257 extension_codec: &dyn LogicalExtensionCodec,
258) -> Result<LogicalPlan> {
259 let protobuf = protobuf::LogicalPlanNode::decode(bytes)
260 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
261 protobuf.try_into_logical_plan(ctx, extension_codec)
262}
263
264#[cfg(feature = "json")]
266pub fn logical_plan_from_json_with_extension_codec(
267 json: &str,
268 ctx: &TaskContext,
269 extension_codec: &dyn LogicalExtensionCodec,
270) -> Result<LogicalPlan> {
271 let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
272 .map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
273 back.try_into_logical_plan(ctx, extension_codec)
274}
275
276pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
278 let extension_codec = DefaultPhysicalExtensionCodec {};
279 physical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
280}
281
282#[cfg(feature = "json")]
284pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
285 let extension_codec = DefaultPhysicalExtensionCodec {};
286 let protobuf =
287 protobuf::PhysicalPlanNode::try_from_physical_plan(plan, &extension_codec)
288 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
289 serde_json::to_string(&protobuf)
290 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
291}
292
293pub fn physical_plan_to_bytes_with_extension_codec(
295 plan: Arc<dyn ExecutionPlan>,
296 extension_codec: &dyn PhysicalExtensionCodec,
297) -> Result<Bytes> {
298 let protobuf =
299 protobuf::PhysicalPlanNode::try_from_physical_plan(plan, extension_codec)?;
300 let mut buffer = BytesMut::new();
301 protobuf
302 .encode(&mut buffer)
303 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
304 Ok(buffer.into())
305}
306
307#[cfg(feature = "json")]
309pub fn physical_plan_from_json(
310 json: &str,
311 ctx: &TaskContext,
312) -> Result<Arc<dyn ExecutionPlan>> {
313 let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
314 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
315 let extension_codec = DefaultPhysicalExtensionCodec {};
316 back.try_into_physical_plan(ctx, &extension_codec)
317}
318
319pub fn physical_plan_from_bytes(
321 bytes: &[u8],
322 ctx: &TaskContext,
323) -> Result<Arc<dyn ExecutionPlan>> {
324 let extension_codec = DefaultPhysicalExtensionCodec {};
325 physical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
326}
327
328pub fn physical_plan_from_bytes_with_extension_codec(
330 bytes: &[u8],
331 ctx: &TaskContext,
332 extension_codec: &dyn PhysicalExtensionCodec,
333) -> Result<Arc<dyn ExecutionPlan>> {
334 let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
335 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
336 protobuf.try_into_physical_plan(ctx, extension_codec)
337}