Skip to main content

datafusion_proto/bytes/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serialization / Deserialization to Bytes
19use crate::logical_plan::to_proto::serialize_expr;
20use crate::logical_plan::{
21    self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
22};
23use crate::physical_plan::{
24    DefaultPhysicalExtensionCodec, DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
25    PhysicalPlanDecodeContext, PhysicalProtoConverterExtension,
26};
27use crate::protobuf;
28use datafusion_common::{Result, plan_datafusion_err};
29use datafusion_execution::TaskContext;
30use datafusion_expr::{Expr, LogicalPlan};
31use prost::{
32    Message,
33    bytes::{Bytes, BytesMut},
34};
35use std::sync::Arc;
36
37use datafusion_physical_plan::ExecutionPlan;
38
39/// Encodes something (such as [`Expr`]) to/from a stream of
40/// bytes.
41///
42/// ```
43/// use datafusion_expr::{col, lit, Expr};
44/// use datafusion_proto::bytes::Serializeable;
45///
46/// // Create a new `Expr` a < 32
47/// let expr = col("a").lt(lit(5i32));
48///
49/// // Convert it to an opaque form
50/// let bytes = expr.to_bytes().unwrap();
51///
52/// // Decode bytes from somewhere (over network, etc.)
53/// let decoded_expr = Expr::from_bytes(&bytes).unwrap();
54/// assert_eq!(expr, decoded_expr);
55/// ```
56pub trait Serializeable: Sized {
57    /// Convert `self` to an opaque byte stream
58    fn to_bytes(&self) -> Result<Bytes>;
59
60    /// Convert `bytes` (the output of [`to_bytes`]) back into an object. This
61    /// will error if the serialized bytes contain any user defined functions,
62    /// in which case use [`from_bytes_with_ctx`]
63    ///
64    /// [`to_bytes`]: Self::to_bytes
65    /// [`from_bytes_with_ctx`]: Self::from_bytes_with_ctx
66    fn from_bytes(bytes: &[u8]) -> Result<Self> {
67        Self::from_bytes_with_ctx(bytes, &TaskContext::default())
68    }
69
70    /// Convert `bytes` (the output of [`to_bytes`]) back into an object,
71    /// resolving user defined functions with the specified `ctx`
72    ///
73    /// [`to_bytes`]: Self::to_bytes
74    fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self>;
75}
76
77impl Serializeable for Expr {
78    fn to_bytes(&self) -> Result<Bytes> {
79        let mut buffer = BytesMut::new();
80        let extension_codec = DefaultLogicalExtensionCodec {};
81        let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
82            .map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
83
84        protobuf
85            .encode(&mut buffer)
86            .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
87
88        let bytes: Bytes = buffer.into();
89
90        // The produced byte stream may lead to "recursion limit" errors, see
91        // https://github.com/apache/datafusion/issues/3968
92        // Until the underlying prost issue ( https://github.com/tokio-rs/prost/issues/736 )
93        // is fixed, verify the bytes can be decoded without hitting that limit.
94        protobuf::LogicalExprNode::decode(bytes.as_ref())
95            .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
96
97        Ok(bytes)
98    }
99
100    fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self> {
101        let protobuf = protobuf::LogicalExprNode::decode(bytes)
102            .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
103
104        let extension_codec = DefaultLogicalExtensionCodec {};
105        logical_plan::from_proto::parse_expr(&protobuf, ctx, &extension_codec)
106            .map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
107    }
108}
109
110/// Serialize a LogicalPlan as bytes
111pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
112    let extension_codec = DefaultLogicalExtensionCodec {};
113    logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
114}
115
116/// Serialize a LogicalPlan as JSON
117#[cfg(feature = "json")]
118pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
119    let extension_codec = DefaultLogicalExtensionCodec {};
120    logical_plan_to_json_with_extension_codec(plan, &extension_codec)
121}
122
123/// Serialize a LogicalPlan as bytes, using the provided extension codec
124pub fn logical_plan_to_bytes_with_extension_codec(
125    plan: &LogicalPlan,
126    extension_codec: &dyn LogicalExtensionCodec,
127) -> Result<Bytes> {
128    let protobuf =
129        protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
130    let mut buffer = BytesMut::new();
131    protobuf
132        .encode(&mut buffer)
133        .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
134    Ok(buffer.into())
135}
136
137/// Serialize a LogicalPlan as JSON using the provided extension codec
138#[cfg(feature = "json")]
139pub fn logical_plan_to_json_with_extension_codec(
140    plan: &LogicalPlan,
141    extension_codec: &dyn LogicalExtensionCodec,
142) -> Result<String> {
143    let protobuf =
144        protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
145            .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
146    serde_json::to_string(&protobuf)
147        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
148}
149
150/// Deserialize a LogicalPlan from JSON
151#[cfg(feature = "json")]
152pub fn logical_plan_from_json(json: &str, ctx: &TaskContext) -> Result<LogicalPlan> {
153    let extension_codec = DefaultLogicalExtensionCodec {};
154    logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
155}
156
157/// Deserialize a LogicalPlan from bytes
158pub fn logical_plan_from_bytes(bytes: &[u8], ctx: &TaskContext) -> Result<LogicalPlan> {
159    let extension_codec = DefaultLogicalExtensionCodec {};
160    logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
161}
162
163/// Deserialize a LogicalPlan from bytes
164pub fn logical_plan_from_bytes_with_extension_codec(
165    bytes: &[u8],
166    ctx: &TaskContext,
167    extension_codec: &dyn LogicalExtensionCodec,
168) -> Result<LogicalPlan> {
169    let protobuf = protobuf::LogicalPlanNode::decode(bytes)
170        .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
171    protobuf.try_into_logical_plan(ctx, extension_codec)
172}
173
174/// Deserialize a LogicalPlan from JSON
175#[cfg(feature = "json")]
176pub fn logical_plan_from_json_with_extension_codec(
177    json: &str,
178    ctx: &TaskContext,
179    extension_codec: &dyn LogicalExtensionCodec,
180) -> Result<LogicalPlan> {
181    let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
182        .map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
183    back.try_into_logical_plan(ctx, extension_codec)
184}
185
186/// Serialize a PhysicalPlan as bytes
187pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
188    let extension_codec = DefaultPhysicalExtensionCodec {};
189    let proto_converter = DefaultPhysicalProtoConverter {};
190    physical_plan_to_bytes_with_proto_converter(plan, &extension_codec, &proto_converter)
191}
192
193/// Serialize a PhysicalPlan as JSON
194#[cfg(feature = "json")]
195pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
196    let extension_codec = DefaultPhysicalExtensionCodec {};
197    let proto_converter = DefaultPhysicalProtoConverter {};
198    let protobuf = proto_converter
199        .execution_plan_to_proto(&plan, &extension_codec)
200        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
201    serde_json::to_string(&protobuf)
202        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
203}
204
205/// Serialize a PhysicalPlan as bytes, using the provided extension codec
206pub fn physical_plan_to_bytes_with_extension_codec(
207    plan: Arc<dyn ExecutionPlan>,
208    extension_codec: &dyn PhysicalExtensionCodec,
209) -> Result<Bytes> {
210    let proto_converter = DefaultPhysicalProtoConverter {};
211    physical_plan_to_bytes_with_proto_converter(plan, extension_codec, &proto_converter)
212}
213
214/// Serialize a PhysicalPlan as bytes, using the provided extension codec
215/// and protobuf converter.
216pub fn physical_plan_to_bytes_with_proto_converter(
217    plan: Arc<dyn ExecutionPlan>,
218    extension_codec: &dyn PhysicalExtensionCodec,
219    proto_converter: &dyn PhysicalProtoConverterExtension,
220) -> Result<Bytes> {
221    let protobuf = proto_converter.execution_plan_to_proto(&plan, extension_codec)?;
222    let mut buffer = BytesMut::new();
223    protobuf
224        .encode(&mut buffer)
225        .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
226    Ok(buffer.into())
227}
228
229/// Deserialize a PhysicalPlan from JSON
230#[cfg(feature = "json")]
231pub fn physical_plan_from_json(
232    json: &str,
233    ctx: &TaskContext,
234) -> Result<Arc<dyn ExecutionPlan>> {
235    let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
236        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
237    let extension_codec = DefaultPhysicalExtensionCodec {};
238    let proto_converter = DefaultPhysicalProtoConverter {};
239    let decode_ctx = PhysicalPlanDecodeContext::new(ctx, &extension_codec);
240    proto_converter.proto_to_execution_plan(&back, &decode_ctx)
241}
242
243/// Deserialize a PhysicalPlan from bytes
244pub fn physical_plan_from_bytes(
245    bytes: &[u8],
246    ctx: &TaskContext,
247) -> Result<Arc<dyn ExecutionPlan>> {
248    let extension_codec = DefaultPhysicalExtensionCodec {};
249    let proto_converter = DefaultPhysicalProtoConverter {};
250    physical_plan_from_bytes_with_proto_converter(
251        bytes,
252        ctx,
253        &extension_codec,
254        &proto_converter,
255    )
256}
257
258/// Deserialize a PhysicalPlan from bytes
259pub fn physical_plan_from_bytes_with_extension_codec(
260    bytes: &[u8],
261    ctx: &TaskContext,
262    extension_codec: &dyn PhysicalExtensionCodec,
263) -> Result<Arc<dyn ExecutionPlan>> {
264    let proto_converter = DefaultPhysicalProtoConverter {};
265    physical_plan_from_bytes_with_proto_converter(
266        bytes,
267        ctx,
268        extension_codec,
269        &proto_converter,
270    )
271}
272
273/// Deserialize a PhysicalPlan from bytes
274pub fn physical_plan_from_bytes_with_proto_converter(
275    bytes: &[u8],
276    ctx: &TaskContext,
277    extension_codec: &dyn PhysicalExtensionCodec,
278    proto_converter: &dyn PhysicalProtoConverterExtension,
279) -> Result<Arc<dyn ExecutionPlan>> {
280    let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
281        .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
282    let decode_ctx = PhysicalPlanDecodeContext::new(ctx, extension_codec);
283    proto_converter.proto_to_execution_plan(&protobuf, &decode_ctx)
284}