1use crate::logical_plan::to_proto::serialize_expr;
20use crate::logical_plan::{
21 self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
22};
23use crate::physical_plan::{
24 DefaultPhysicalExtensionCodec, DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
25 PhysicalProtoConverterExtension,
26};
27use crate::protobuf;
28use datafusion_common::{Result, plan_datafusion_err};
29use datafusion_execution::TaskContext;
30use datafusion_expr::{
31 AggregateUDF, Expr, LogicalPlan, Volatility, WindowUDF, create_udaf, create_udf,
32 create_udwf,
33};
34use prost::{
35 Message,
36 bytes::{Bytes, BytesMut},
37};
38use std::sync::Arc;
39
40use datafusion_execution::registry::FunctionRegistry;
42use datafusion_expr::planner::ExprPlanner;
43use datafusion_physical_plan::ExecutionPlan;
44
45mod registry;
46
47pub trait Serializeable: Sized {
65 fn to_bytes(&self) -> Result<Bytes>;
67
68 fn from_bytes(bytes: &[u8]) -> Result<Self> {
76 Self::from_bytes_with_registry(bytes, ®istry::NoRegistry {})
77 }
78
79 fn from_bytes_with_registry(
85 bytes: &[u8],
86 registry: &dyn FunctionRegistry,
87 ) -> Result<Self>;
88}
89
90impl Serializeable for Expr {
91 fn to_bytes(&self) -> Result<Bytes> {
92 let mut buffer = BytesMut::new();
93 let extension_codec = DefaultLogicalExtensionCodec {};
94 let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
95 .map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
96
97 protobuf
98 .encode(&mut buffer)
99 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
100
101 let bytes: Bytes = buffer.into();
102
103 struct PlaceHolderRegistry;
110
111 impl FunctionRegistry for PlaceHolderRegistry {
112 fn udfs(&self) -> std::collections::HashSet<String> {
113 std::collections::HashSet::default()
114 }
115
116 fn udf(&self, name: &str) -> Result<Arc<datafusion_expr::ScalarUDF>> {
117 Ok(Arc::new(create_udf(
118 name,
119 vec![],
120 arrow::datatypes::DataType::Null,
121 Volatility::Immutable,
122 Arc::new(|_| unimplemented!()),
123 )))
124 }
125
126 fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
127 Ok(Arc::new(create_udaf(
128 name,
129 vec![arrow::datatypes::DataType::Null],
130 Arc::new(arrow::datatypes::DataType::Null),
131 Volatility::Immutable,
132 Arc::new(|_| unimplemented!()),
133 Arc::new(vec![]),
134 )))
135 }
136
137 fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
138 Ok(Arc::new(create_udwf(
139 name,
140 arrow::datatypes::DataType::Null,
141 Arc::new(arrow::datatypes::DataType::Null),
142 Volatility::Immutable,
143 Arc::new(|| unimplemented!()),
144 )))
145 }
146 fn register_udaf(
147 &mut self,
148 _udaf: Arc<AggregateUDF>,
149 ) -> Result<Option<Arc<AggregateUDF>>> {
150 datafusion_common::internal_err!(
151 "register_udaf called in Placeholder Registry!"
152 )
153 }
154 fn register_udf(
155 &mut self,
156 _udf: Arc<datafusion_expr::ScalarUDF>,
157 ) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
158 datafusion_common::internal_err!(
159 "register_udf called in Placeholder Registry!"
160 )
161 }
162 fn register_udwf(
163 &mut self,
164 _udaf: Arc<WindowUDF>,
165 ) -> Result<Option<Arc<WindowUDF>>> {
166 datafusion_common::internal_err!(
167 "register_udwf called in Placeholder Registry!"
168 )
169 }
170
171 fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
172 vec![]
173 }
174
175 fn udafs(&self) -> std::collections::HashSet<String> {
176 std::collections::HashSet::default()
177 }
178
179 fn udwfs(&self) -> std::collections::HashSet<String> {
180 std::collections::HashSet::default()
181 }
182 }
183 Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
184
185 Ok(bytes)
186 }
187
188 fn from_bytes_with_registry(
189 bytes: &[u8],
190 registry: &dyn FunctionRegistry,
191 ) -> Result<Self> {
192 let protobuf = protobuf::LogicalExprNode::decode(bytes)
193 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
194
195 let extension_codec = DefaultLogicalExtensionCodec {};
196 logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec)
197 .map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
198 }
199}
200
201pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
203 let extension_codec = DefaultLogicalExtensionCodec {};
204 logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
205}
206
207#[cfg(feature = "json")]
209pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
210 let extension_codec = DefaultLogicalExtensionCodec {};
211 logical_plan_to_json_with_extension_codec(plan, &extension_codec)
212}
213
214pub fn logical_plan_to_bytes_with_extension_codec(
216 plan: &LogicalPlan,
217 extension_codec: &dyn LogicalExtensionCodec,
218) -> Result<Bytes> {
219 let protobuf =
220 protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
221 let mut buffer = BytesMut::new();
222 protobuf
223 .encode(&mut buffer)
224 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
225 Ok(buffer.into())
226}
227
228#[cfg(feature = "json")]
230pub fn logical_plan_to_json_with_extension_codec(
231 plan: &LogicalPlan,
232 extension_codec: &dyn LogicalExtensionCodec,
233) -> Result<String> {
234 let protobuf =
235 protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
236 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
237 serde_json::to_string(&protobuf)
238 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
239}
240
241#[cfg(feature = "json")]
243pub fn logical_plan_from_json(json: &str, ctx: &TaskContext) -> Result<LogicalPlan> {
244 let extension_codec = DefaultLogicalExtensionCodec {};
245 logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
246}
247
248pub fn logical_plan_from_bytes(bytes: &[u8], ctx: &TaskContext) -> Result<LogicalPlan> {
250 let extension_codec = DefaultLogicalExtensionCodec {};
251 logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
252}
253
254pub fn logical_plan_from_bytes_with_extension_codec(
256 bytes: &[u8],
257 ctx: &TaskContext,
258 extension_codec: &dyn LogicalExtensionCodec,
259) -> Result<LogicalPlan> {
260 let protobuf = protobuf::LogicalPlanNode::decode(bytes)
261 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
262 protobuf.try_into_logical_plan(ctx, extension_codec)
263}
264
265#[cfg(feature = "json")]
267pub fn logical_plan_from_json_with_extension_codec(
268 json: &str,
269 ctx: &TaskContext,
270 extension_codec: &dyn LogicalExtensionCodec,
271) -> Result<LogicalPlan> {
272 let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
273 .map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
274 back.try_into_logical_plan(ctx, extension_codec)
275}
276
277pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
279 let extension_codec = DefaultPhysicalExtensionCodec {};
280 let proto_converter = DefaultPhysicalProtoConverter {};
281 physical_plan_to_bytes_with_proto_converter(plan, &extension_codec, &proto_converter)
282}
283
284#[cfg(feature = "json")]
286pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
287 let extension_codec = DefaultPhysicalExtensionCodec {};
288 let proto_converter = DefaultPhysicalProtoConverter {};
289 let protobuf = proto_converter
290 .execution_plan_to_proto(&plan, &extension_codec)
291 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
292 serde_json::to_string(&protobuf)
293 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
294}
295
296pub fn physical_plan_to_bytes_with_extension_codec(
298 plan: Arc<dyn ExecutionPlan>,
299 extension_codec: &dyn PhysicalExtensionCodec,
300) -> Result<Bytes> {
301 let proto_converter = DefaultPhysicalProtoConverter {};
302 physical_plan_to_bytes_with_proto_converter(plan, extension_codec, &proto_converter)
303}
304
305pub fn physical_plan_to_bytes_with_proto_converter(
308 plan: Arc<dyn ExecutionPlan>,
309 extension_codec: &dyn PhysicalExtensionCodec,
310 proto_converter: &dyn PhysicalProtoConverterExtension,
311) -> Result<Bytes> {
312 let protobuf = proto_converter.execution_plan_to_proto(&plan, extension_codec)?;
313 let mut buffer = BytesMut::new();
314 protobuf
315 .encode(&mut buffer)
316 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
317 Ok(buffer.into())
318}
319
320#[cfg(feature = "json")]
322pub fn physical_plan_from_json(
323 json: &str,
324 ctx: &TaskContext,
325) -> Result<Arc<dyn ExecutionPlan>> {
326 let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
327 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
328 let extension_codec = DefaultPhysicalExtensionCodec {};
329 let proto_converter = DefaultPhysicalProtoConverter {};
330 proto_converter.proto_to_execution_plan(ctx, &extension_codec, &back)
331}
332
333pub fn physical_plan_from_bytes(
335 bytes: &[u8],
336 ctx: &TaskContext,
337) -> Result<Arc<dyn ExecutionPlan>> {
338 let extension_codec = DefaultPhysicalExtensionCodec {};
339 let proto_converter = DefaultPhysicalProtoConverter {};
340 physical_plan_from_bytes_with_proto_converter(
341 bytes,
342 ctx,
343 &extension_codec,
344 &proto_converter,
345 )
346}
347
348pub fn physical_plan_from_bytes_with_extension_codec(
350 bytes: &[u8],
351 ctx: &TaskContext,
352 extension_codec: &dyn PhysicalExtensionCodec,
353) -> Result<Arc<dyn ExecutionPlan>> {
354 let proto_converter = DefaultPhysicalProtoConverter {};
355 physical_plan_from_bytes_with_proto_converter(
356 bytes,
357 ctx,
358 extension_codec,
359 &proto_converter,
360 )
361}
362
363pub fn physical_plan_from_bytes_with_proto_converter(
365 bytes: &[u8],
366 ctx: &TaskContext,
367 extension_codec: &dyn PhysicalExtensionCodec,
368 proto_converter: &dyn PhysicalProtoConverterExtension,
369) -> Result<Arc<dyn ExecutionPlan>> {
370 let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
371 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
372 proto_converter.proto_to_execution_plan(ctx, extension_codec, &protobuf)
373}