datafusion_proto/bytes/
mod.rs1use crate::logical_plan::to_proto::serialize_expr;
20use crate::logical_plan::{
21 self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
22};
23use crate::physical_plan::{
24 DefaultPhysicalExtensionCodec, DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
25 PhysicalPlanDecodeContext, PhysicalProtoConverterExtension,
26};
27use crate::protobuf;
28use datafusion_common::{Result, plan_datafusion_err};
29use datafusion_execution::TaskContext;
30use datafusion_expr::{Expr, LogicalPlan};
31use prost::{
32 Message,
33 bytes::{Bytes, BytesMut},
34};
35use std::sync::Arc;
36
37use datafusion_physical_plan::ExecutionPlan;
38
39pub trait Serializeable: Sized {
57 fn to_bytes(&self) -> Result<Bytes>;
59
60 fn from_bytes(bytes: &[u8]) -> Result<Self> {
67 Self::from_bytes_with_ctx(bytes, &TaskContext::default())
68 }
69
70 fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self>;
75}
76
77impl Serializeable for Expr {
78 fn to_bytes(&self) -> Result<Bytes> {
79 let mut buffer = BytesMut::new();
80 let extension_codec = DefaultLogicalExtensionCodec {};
81 let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
82 .map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
83
84 protobuf
85 .encode(&mut buffer)
86 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
87
88 let bytes: Bytes = buffer.into();
89
90 protobuf::LogicalExprNode::decode(bytes.as_ref())
95 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
96
97 Ok(bytes)
98 }
99
100 fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self> {
101 let protobuf = protobuf::LogicalExprNode::decode(bytes)
102 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
103
104 let extension_codec = DefaultLogicalExtensionCodec {};
105 logical_plan::from_proto::parse_expr(&protobuf, ctx, &extension_codec)
106 .map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
107 }
108}
109
110pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
112 let extension_codec = DefaultLogicalExtensionCodec {};
113 logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
114}
115
116#[cfg(feature = "json")]
118pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
119 let extension_codec = DefaultLogicalExtensionCodec {};
120 logical_plan_to_json_with_extension_codec(plan, &extension_codec)
121}
122
123pub fn logical_plan_to_bytes_with_extension_codec(
125 plan: &LogicalPlan,
126 extension_codec: &dyn LogicalExtensionCodec,
127) -> Result<Bytes> {
128 let protobuf =
129 protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
130 let mut buffer = BytesMut::new();
131 protobuf
132 .encode(&mut buffer)
133 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
134 Ok(buffer.into())
135}
136
137#[cfg(feature = "json")]
139pub fn logical_plan_to_json_with_extension_codec(
140 plan: &LogicalPlan,
141 extension_codec: &dyn LogicalExtensionCodec,
142) -> Result<String> {
143 let protobuf =
144 protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
145 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
146 serde_json::to_string(&protobuf)
147 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
148}
149
150#[cfg(feature = "json")]
152pub fn logical_plan_from_json(json: &str, ctx: &TaskContext) -> Result<LogicalPlan> {
153 let extension_codec = DefaultLogicalExtensionCodec {};
154 logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
155}
156
157pub fn logical_plan_from_bytes(bytes: &[u8], ctx: &TaskContext) -> Result<LogicalPlan> {
159 let extension_codec = DefaultLogicalExtensionCodec {};
160 logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
161}
162
163pub fn logical_plan_from_bytes_with_extension_codec(
165 bytes: &[u8],
166 ctx: &TaskContext,
167 extension_codec: &dyn LogicalExtensionCodec,
168) -> Result<LogicalPlan> {
169 let protobuf = protobuf::LogicalPlanNode::decode(bytes)
170 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
171 protobuf.try_into_logical_plan(ctx, extension_codec)
172}
173
174#[cfg(feature = "json")]
176pub fn logical_plan_from_json_with_extension_codec(
177 json: &str,
178 ctx: &TaskContext,
179 extension_codec: &dyn LogicalExtensionCodec,
180) -> Result<LogicalPlan> {
181 let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
182 .map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
183 back.try_into_logical_plan(ctx, extension_codec)
184}
185
186pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
188 let extension_codec = DefaultPhysicalExtensionCodec {};
189 let proto_converter = DefaultPhysicalProtoConverter {};
190 physical_plan_to_bytes_with_proto_converter(plan, &extension_codec, &proto_converter)
191}
192
193#[cfg(feature = "json")]
195pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
196 let extension_codec = DefaultPhysicalExtensionCodec {};
197 let proto_converter = DefaultPhysicalProtoConverter {};
198 let protobuf = proto_converter
199 .execution_plan_to_proto(&plan, &extension_codec)
200 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
201 serde_json::to_string(&protobuf)
202 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
203}
204
205pub fn physical_plan_to_bytes_with_extension_codec(
207 plan: Arc<dyn ExecutionPlan>,
208 extension_codec: &dyn PhysicalExtensionCodec,
209) -> Result<Bytes> {
210 let proto_converter = DefaultPhysicalProtoConverter {};
211 physical_plan_to_bytes_with_proto_converter(plan, extension_codec, &proto_converter)
212}
213
214pub fn physical_plan_to_bytes_with_proto_converter(
217 plan: Arc<dyn ExecutionPlan>,
218 extension_codec: &dyn PhysicalExtensionCodec,
219 proto_converter: &dyn PhysicalProtoConverterExtension,
220) -> Result<Bytes> {
221 let protobuf = proto_converter.execution_plan_to_proto(&plan, extension_codec)?;
222 let mut buffer = BytesMut::new();
223 protobuf
224 .encode(&mut buffer)
225 .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
226 Ok(buffer.into())
227}
228
229#[cfg(feature = "json")]
231pub fn physical_plan_from_json(
232 json: &str,
233 ctx: &TaskContext,
234) -> Result<Arc<dyn ExecutionPlan>> {
235 let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
236 .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
237 let extension_codec = DefaultPhysicalExtensionCodec {};
238 let proto_converter = DefaultPhysicalProtoConverter {};
239 let decode_ctx = PhysicalPlanDecodeContext::new(ctx, &extension_codec);
240 proto_converter.proto_to_execution_plan(&back, &decode_ctx)
241}
242
243pub fn physical_plan_from_bytes(
245 bytes: &[u8],
246 ctx: &TaskContext,
247) -> Result<Arc<dyn ExecutionPlan>> {
248 let extension_codec = DefaultPhysicalExtensionCodec {};
249 let proto_converter = DefaultPhysicalProtoConverter {};
250 physical_plan_from_bytes_with_proto_converter(
251 bytes,
252 ctx,
253 &extension_codec,
254 &proto_converter,
255 )
256}
257
258pub fn physical_plan_from_bytes_with_extension_codec(
260 bytes: &[u8],
261 ctx: &TaskContext,
262 extension_codec: &dyn PhysicalExtensionCodec,
263) -> Result<Arc<dyn ExecutionPlan>> {
264 let proto_converter = DefaultPhysicalProtoConverter {};
265 physical_plan_from_bytes_with_proto_converter(
266 bytes,
267 ctx,
268 extension_codec,
269 &proto_converter,
270 )
271}
272
273pub fn physical_plan_from_bytes_with_proto_converter(
275 bytes: &[u8],
276 ctx: &TaskContext,
277 extension_codec: &dyn PhysicalExtensionCodec,
278 proto_converter: &dyn PhysicalProtoConverterExtension,
279) -> Result<Arc<dyn ExecutionPlan>> {
280 let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
281 .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
282 let decode_ctx = PhysicalPlanDecodeContext::new(ctx, extension_codec);
283 proto_converter.proto_to_execution_plan(&protobuf, &decode_ctx)
284}