1use crate::logical_plan::to_proto::serialize_expr;
20use crate::logical_plan::{
21 self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
22};
23use crate::physical_plan::{
24 AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
25};
26use crate::protobuf;
27use datafusion_common::{plan_datafusion_err, Result};
28use datafusion_expr::{
29 create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility,
30 WindowUDF,
31};
32use prost::{
33 bytes::{Bytes, BytesMut},
34 Message,
35};
36use std::sync::Arc;
37
38use datafusion::execution::registry::FunctionRegistry;
40use datafusion::physical_plan::ExecutionPlan;
41use datafusion::prelude::SessionContext;
42use datafusion_expr::planner::ExprPlanner;
43
44mod registry;
45
46pub trait Serializeable: Sized {
64 fn to_bytes(&self) -> Result<Bytes>;
66
67 fn from_bytes(bytes: &[u8]) -> Result<Self> {
75 Self::from_bytes_with_registry(bytes, ®istry::NoRegistry {})
76 }
77
78 fn from_bytes_with_registry(
84 bytes: &[u8],
85 registry: &dyn FunctionRegistry,
86 ) -> Result<Self>;
87}
88
89impl Serializeable for Expr {
90 fn to_bytes(&self) -> Result<Bytes> {
91 let mut buffer = BytesMut::new();
92 let extension_codec = DefaultLogicalExtensionCodec {};
93 let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
94 .map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
95
96 protobuf
97 .encode(&mut buffer)
98 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
99
100 let bytes: Bytes = buffer.into();
101
102 struct PlaceHolderRegistry;
109
110 impl FunctionRegistry for PlaceHolderRegistry {
111 fn udfs(&self) -> std::collections::HashSet<String> {
112 std::collections::HashSet::default()
113 }
114
115 fn udf(&self, name: &str) -> Result<Arc<datafusion_expr::ScalarUDF>> {
116 Ok(Arc::new(create_udf(
117 name,
118 vec![],
119 arrow::datatypes::DataType::Null,
120 Volatility::Immutable,
121 Arc::new(|_| unimplemented!()),
122 )))
123 }
124
125 fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
126 Ok(Arc::new(create_udaf(
127 name,
128 vec![arrow::datatypes::DataType::Null],
129 Arc::new(arrow::datatypes::DataType::Null),
130 Volatility::Immutable,
131 Arc::new(|_| unimplemented!()),
132 Arc::new(vec![]),
133 )))
134 }
135
136 fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
137 Ok(Arc::new(create_udwf(
138 name,
139 arrow::datatypes::DataType::Null,
140 Arc::new(arrow::datatypes::DataType::Null),
141 Volatility::Immutable,
142 Arc::new(|| unimplemented!()),
143 )))
144 }
145 fn register_udaf(
146 &mut self,
147 _udaf: Arc<AggregateUDF>,
148 ) -> Result<Option<Arc<AggregateUDF>>> {
149 datafusion_common::internal_err!(
150 "register_udaf called in Placeholder Registry!"
151 )
152 }
153 fn register_udf(
154 &mut self,
155 _udf: Arc<datafusion_expr::ScalarUDF>,
156 ) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
157 datafusion_common::internal_err!(
158 "register_udf called in Placeholder Registry!"
159 )
160 }
161 fn register_udwf(
162 &mut self,
163 _udaf: Arc<WindowUDF>,
164 ) -> Result<Option<Arc<WindowUDF>>> {
165 datafusion_common::internal_err!(
166 "register_udwf called in Placeholder Registry!"
167 )
168 }
169
170 fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
171 vec![]
172 }
173 }
174 Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
175
176 Ok(bytes)
177 }
178
179 fn from_bytes_with_registry(
180 bytes: &[u8],
181 registry: &dyn FunctionRegistry,
182 ) -> Result<Self> {
183 let protobuf = protobuf::LogicalExprNode::decode(bytes)
184 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
185
186 let extension_codec = DefaultLogicalExtensionCodec {};
187 logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec)
188 .map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
189 }
190}
191
192pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
194 let extension_codec = DefaultLogicalExtensionCodec {};
195 logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
196}
197
198#[cfg(feature = "json")]
200pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
201 let extension_codec = DefaultLogicalExtensionCodec {};
202 logical_plan_to_json_with_extension_codec(plan, &extension_codec)
203}
204
205pub fn logical_plan_to_bytes_with_extension_codec(
207 plan: &LogicalPlan,
208 extension_codec: &dyn LogicalExtensionCodec,
209) -> Result<Bytes> {
210 let protobuf =
211 protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
212 let mut buffer = BytesMut::new();
213 protobuf
214 .encode(&mut buffer)
215 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
216 Ok(buffer.into())
217}
218
219#[cfg(feature = "json")]
221pub fn logical_plan_to_json_with_extension_codec(
222 plan: &LogicalPlan,
223 extension_codec: &dyn LogicalExtensionCodec,
224) -> Result<String> {
225 let protobuf =
226 protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
227 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
228 serde_json::to_string(&protobuf)
229 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
230}
231
232#[cfg(feature = "json")]
234pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
235 let extension_codec = DefaultLogicalExtensionCodec {};
236 logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
237}
238
239pub fn logical_plan_from_bytes(
241 bytes: &[u8],
242 ctx: &SessionContext,
243) -> Result<LogicalPlan> {
244 let extension_codec = DefaultLogicalExtensionCodec {};
245 logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
246}
247
248pub fn logical_plan_from_bytes_with_extension_codec(
250 bytes: &[u8],
251 ctx: &SessionContext,
252 extension_codec: &dyn LogicalExtensionCodec,
253) -> Result<LogicalPlan> {
254 let protobuf = protobuf::LogicalPlanNode::decode(bytes)
255 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
256 protobuf.try_into_logical_plan(ctx, extension_codec)
257}
258
259#[cfg(feature = "json")]
261pub fn logical_plan_from_json_with_extension_codec(
262 json: &str,
263 ctx: &SessionContext,
264 extension_codec: &dyn LogicalExtensionCodec,
265) -> Result<LogicalPlan> {
266 let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
267 .map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
268 back.try_into_logical_plan(ctx, extension_codec)
269}
270
271pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
273 let extension_codec = DefaultPhysicalExtensionCodec {};
274 physical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
275}
276
277#[cfg(feature = "json")]
279pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
280 let extension_codec = DefaultPhysicalExtensionCodec {};
281 let protobuf =
282 protobuf::PhysicalPlanNode::try_from_physical_plan(plan, &extension_codec)
283 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
284 serde_json::to_string(&protobuf)
285 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
286}
287
288pub fn physical_plan_to_bytes_with_extension_codec(
290 plan: Arc<dyn ExecutionPlan>,
291 extension_codec: &dyn PhysicalExtensionCodec,
292) -> Result<Bytes> {
293 let protobuf =
294 protobuf::PhysicalPlanNode::try_from_physical_plan(plan, extension_codec)?;
295 let mut buffer = BytesMut::new();
296 protobuf
297 .encode(&mut buffer)
298 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
299 Ok(buffer.into())
300}
301
302#[cfg(feature = "json")]
304pub fn physical_plan_from_json(
305 json: &str,
306 ctx: &SessionContext,
307) -> Result<Arc<dyn ExecutionPlan>> {
308 let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
309 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
310 let extension_codec = DefaultPhysicalExtensionCodec {};
311 back.try_into_physical_plan(ctx, &ctx.runtime_env(), &extension_codec)
312}
313
314pub fn physical_plan_from_bytes(
316 bytes: &[u8],
317 ctx: &SessionContext,
318) -> Result<Arc<dyn ExecutionPlan>> {
319 let extension_codec = DefaultPhysicalExtensionCodec {};
320 physical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
321}
322
323pub fn physical_plan_from_bytes_with_extension_codec(
325 bytes: &[u8],
326 ctx: &SessionContext,
327 extension_codec: &dyn PhysicalExtensionCodec,
328) -> Result<Arc<dyn ExecutionPlan>> {
329 let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
330 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
331 protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec)
332}