1use crate::logical_plan::to_proto::serialize_expr;
20use crate::logical_plan::{
21 self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
22};
23use crate::physical_plan::{
24 AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
25};
26use crate::protobuf;
27use datafusion_common::{plan_datafusion_err, Result};
28use datafusion_expr::{
29 create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility,
30 WindowUDF,
31};
32use prost::{
33 bytes::{Bytes, BytesMut},
34 Message,
35};
36use std::sync::Arc;
37
38use datafusion::execution::registry::FunctionRegistry;
40use datafusion::physical_plan::ExecutionPlan;
41use datafusion::prelude::SessionContext;
42use datafusion_expr::planner::ExprPlanner;
43
44mod registry;
45
46pub trait Serializeable: Sized {
64 fn to_bytes(&self) -> Result<Bytes>;
66
67 fn from_bytes(bytes: &[u8]) -> Result<Self> {
75 Self::from_bytes_with_registry(bytes, ®istry::NoRegistry {})
76 }
77
78 fn from_bytes_with_registry(
84 bytes: &[u8],
85 registry: &dyn FunctionRegistry,
86 ) -> Result<Self>;
87}
88
89impl Serializeable for Expr {
90 fn to_bytes(&self) -> Result<Bytes> {
91 let mut buffer = BytesMut::new();
92 let extension_codec = DefaultLogicalExtensionCodec {};
93 let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
94 .map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
95
96 protobuf
97 .encode(&mut buffer)
98 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
99
100 let bytes: Bytes = buffer.into();
101
102 struct PlaceHolderRegistry;
109
110 impl FunctionRegistry for PlaceHolderRegistry {
111 fn udfs(&self) -> std::collections::HashSet<String> {
112 std::collections::HashSet::default()
113 }
114
115 fn udf(&self, name: &str) -> Result<Arc<datafusion_expr::ScalarUDF>> {
116 Ok(Arc::new(create_udf(
117 name,
118 vec![],
119 arrow::datatypes::DataType::Null,
120 Volatility::Immutable,
121 Arc::new(|_| unimplemented!()),
122 )))
123 }
124
125 fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
126 Ok(Arc::new(create_udaf(
127 name,
128 vec![arrow::datatypes::DataType::Null],
129 Arc::new(arrow::datatypes::DataType::Null),
130 Volatility::Immutable,
131 Arc::new(|_| unimplemented!()),
132 Arc::new(vec![]),
133 )))
134 }
135
136 fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
137 Ok(Arc::new(create_udwf(
138 name,
139 arrow::datatypes::DataType::Null,
140 Arc::new(arrow::datatypes::DataType::Null),
141 Volatility::Immutable,
142 Arc::new(|| unimplemented!()),
143 )))
144 }
145 fn register_udaf(
146 &mut self,
147 _udaf: Arc<AggregateUDF>,
148 ) -> Result<Option<Arc<AggregateUDF>>> {
149 datafusion_common::internal_err!(
150 "register_udaf called in Placeholder Registry!"
151 )
152 }
153 fn register_udf(
154 &mut self,
155 _udf: Arc<datafusion_expr::ScalarUDF>,
156 ) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
157 datafusion_common::internal_err!(
158 "register_udf called in Placeholder Registry!"
159 )
160 }
161 fn register_udwf(
162 &mut self,
163 _udaf: Arc<WindowUDF>,
164 ) -> Result<Option<Arc<WindowUDF>>> {
165 datafusion_common::internal_err!(
166 "register_udwf called in Placeholder Registry!"
167 )
168 }
169
170 fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
171 vec![]
172 }
173
174 fn udafs(&self) -> std::collections::HashSet<String> {
175 std::collections::HashSet::default()
176 }
177
178 fn udwfs(&self) -> std::collections::HashSet<String> {
179 std::collections::HashSet::default()
180 }
181 }
182 Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
183
184 Ok(bytes)
185 }
186
187 fn from_bytes_with_registry(
188 bytes: &[u8],
189 registry: &dyn FunctionRegistry,
190 ) -> Result<Self> {
191 let protobuf = protobuf::LogicalExprNode::decode(bytes)
192 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
193
194 let extension_codec = DefaultLogicalExtensionCodec {};
195 logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec)
196 .map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
197 }
198}
199
200pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
202 let extension_codec = DefaultLogicalExtensionCodec {};
203 logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
204}
205
206#[cfg(feature = "json")]
208pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
209 let extension_codec = DefaultLogicalExtensionCodec {};
210 logical_plan_to_json_with_extension_codec(plan, &extension_codec)
211}
212
213pub fn logical_plan_to_bytes_with_extension_codec(
215 plan: &LogicalPlan,
216 extension_codec: &dyn LogicalExtensionCodec,
217) -> Result<Bytes> {
218 let protobuf =
219 protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
220 let mut buffer = BytesMut::new();
221 protobuf
222 .encode(&mut buffer)
223 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
224 Ok(buffer.into())
225}
226
227#[cfg(feature = "json")]
229pub fn logical_plan_to_json_with_extension_codec(
230 plan: &LogicalPlan,
231 extension_codec: &dyn LogicalExtensionCodec,
232) -> Result<String> {
233 let protobuf =
234 protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
235 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
236 serde_json::to_string(&protobuf)
237 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
238}
239
240#[cfg(feature = "json")]
242pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
243 let extension_codec = DefaultLogicalExtensionCodec {};
244 logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
245}
246
247pub fn logical_plan_from_bytes(
249 bytes: &[u8],
250 ctx: &SessionContext,
251) -> Result<LogicalPlan> {
252 let extension_codec = DefaultLogicalExtensionCodec {};
253 logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
254}
255
256pub fn logical_plan_from_bytes_with_extension_codec(
258 bytes: &[u8],
259 ctx: &SessionContext,
260 extension_codec: &dyn LogicalExtensionCodec,
261) -> Result<LogicalPlan> {
262 let protobuf = protobuf::LogicalPlanNode::decode(bytes)
263 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
264 protobuf.try_into_logical_plan(ctx, extension_codec)
265}
266
267#[cfg(feature = "json")]
269pub fn logical_plan_from_json_with_extension_codec(
270 json: &str,
271 ctx: &SessionContext,
272 extension_codec: &dyn LogicalExtensionCodec,
273) -> Result<LogicalPlan> {
274 let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
275 .map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
276 back.try_into_logical_plan(ctx, extension_codec)
277}
278
279pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
281 let extension_codec = DefaultPhysicalExtensionCodec {};
282 physical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
283}
284
285#[cfg(feature = "json")]
287pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
288 let extension_codec = DefaultPhysicalExtensionCodec {};
289 let protobuf =
290 protobuf::PhysicalPlanNode::try_from_physical_plan(plan, &extension_codec)
291 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
292 serde_json::to_string(&protobuf)
293 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
294}
295
296pub fn physical_plan_to_bytes_with_extension_codec(
298 plan: Arc<dyn ExecutionPlan>,
299 extension_codec: &dyn PhysicalExtensionCodec,
300) -> Result<Bytes> {
301 let protobuf =
302 protobuf::PhysicalPlanNode::try_from_physical_plan(plan, extension_codec)?;
303 let mut buffer = BytesMut::new();
304 protobuf
305 .encode(&mut buffer)
306 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
307 Ok(buffer.into())
308}
309
310#[cfg(feature = "json")]
312pub fn physical_plan_from_json(
313 json: &str,
314 ctx: &SessionContext,
315) -> Result<Arc<dyn ExecutionPlan>> {
316 let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
317 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
318 let extension_codec = DefaultPhysicalExtensionCodec {};
319 back.try_into_physical_plan(ctx, &ctx.runtime_env(), &extension_codec)
320}
321
322pub fn physical_plan_from_bytes(
324 bytes: &[u8],
325 ctx: &SessionContext,
326) -> Result<Arc<dyn ExecutionPlan>> {
327 let extension_codec = DefaultPhysicalExtensionCodec {};
328 physical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
329}
330
331pub fn physical_plan_from_bytes_with_extension_codec(
333 bytes: &[u8],
334 ctx: &SessionContext,
335 extension_codec: &dyn PhysicalExtensionCodec,
336) -> Result<Arc<dyn ExecutionPlan>> {
337 let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
338 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
339 protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec)
340}