datafusion_proto/bytes/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serialization / Deserialization to Bytes
19use crate::logical_plan::to_proto::serialize_expr;
20use crate::logical_plan::{
21    self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
22};
23use crate::physical_plan::{
24    AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
25};
26use crate::protobuf;
27use datafusion_common::{plan_datafusion_err, Result};
28use datafusion_expr::{
29    create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility,
30    WindowUDF,
31};
32use prost::{
33    bytes::{Bytes, BytesMut},
34    Message,
35};
36use std::sync::Arc;
37
38// Reexport Bytes which appears in the API
39use datafusion::execution::registry::FunctionRegistry;
40use datafusion::physical_plan::ExecutionPlan;
41use datafusion::prelude::SessionContext;
42use datafusion_expr::planner::ExprPlanner;
43
44mod registry;
45
46/// Encodes something (such as [`Expr`]) to/from a stream of
47/// bytes.
48///
49/// ```
50/// use datafusion_expr::{col, lit, Expr};
51/// use datafusion_proto::bytes::Serializeable;
52///
53/// // Create a new `Expr` a < 32
54/// let expr = col("a").lt(lit(5i32));
55///
56/// // Convert it to an opaque form
57/// let bytes = expr.to_bytes().unwrap();
58///
59/// // Decode bytes from somewhere (over network, etc.)
60/// let decoded_expr = Expr::from_bytes(&bytes).unwrap();
61/// assert_eq!(expr, decoded_expr);
62/// ```
63pub trait Serializeable: Sized {
64    /// Convert `self` to an opaque byte stream
65    fn to_bytes(&self) -> Result<Bytes>;
66
67    /// Convert `bytes` (the output of [`to_bytes`]) back into an
68    /// object. This will error if the serialized bytes contain any
69    /// user defined functions, in which case use
70    /// [`from_bytes_with_registry`]
71    ///
72    /// [`to_bytes`]: Self::to_bytes
73    /// [`from_bytes_with_registry`]: Self::from_bytes_with_registry
74    fn from_bytes(bytes: &[u8]) -> Result<Self> {
75        Self::from_bytes_with_registry(bytes, &registry::NoRegistry {})
76    }
77
78    /// Convert `bytes` (the output of [`to_bytes`]) back into an
79    /// object resolving user defined functions with the specified
80    /// `registry`
81    ///
82    /// [`to_bytes`]: Self::to_bytes
83    fn from_bytes_with_registry(
84        bytes: &[u8],
85        registry: &dyn FunctionRegistry,
86    ) -> Result<Self>;
87}
88
89impl Serializeable for Expr {
90    fn to_bytes(&self) -> Result<Bytes> {
91        let mut buffer = BytesMut::new();
92        let extension_codec = DefaultLogicalExtensionCodec {};
93        let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
94            .map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
95
96        protobuf
97            .encode(&mut buffer)
98            .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
99
100        let bytes: Bytes = buffer.into();
101
102        // the produced byte stream may lead to "recursion limit" errors, see
103        // https://github.com/apache/datafusion/issues/3968
104        // Until the underlying prost issue ( https://github.com/tokio-rs/prost/issues/736 ) is fixed, we try to
105        // deserialize the data here and check for errors.
106        //
107        // Need to provide some placeholder registry because the stream may contain UDFs
108        struct PlaceHolderRegistry;
109
110        impl FunctionRegistry for PlaceHolderRegistry {
111            fn udfs(&self) -> std::collections::HashSet<String> {
112                std::collections::HashSet::default()
113            }
114
115            fn udf(&self, name: &str) -> Result<Arc<datafusion_expr::ScalarUDF>> {
116                Ok(Arc::new(create_udf(
117                    name,
118                    vec![],
119                    arrow::datatypes::DataType::Null,
120                    Volatility::Immutable,
121                    Arc::new(|_| unimplemented!()),
122                )))
123            }
124
125            fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
126                Ok(Arc::new(create_udaf(
127                    name,
128                    vec![arrow::datatypes::DataType::Null],
129                    Arc::new(arrow::datatypes::DataType::Null),
130                    Volatility::Immutable,
131                    Arc::new(|_| unimplemented!()),
132                    Arc::new(vec![]),
133                )))
134            }
135
136            fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
137                Ok(Arc::new(create_udwf(
138                    name,
139                    arrow::datatypes::DataType::Null,
140                    Arc::new(arrow::datatypes::DataType::Null),
141                    Volatility::Immutable,
142                    Arc::new(|| unimplemented!()),
143                )))
144            }
145            fn register_udaf(
146                &mut self,
147                _udaf: Arc<AggregateUDF>,
148            ) -> Result<Option<Arc<AggregateUDF>>> {
149                datafusion_common::internal_err!(
150                    "register_udaf called in Placeholder Registry!"
151                )
152            }
153            fn register_udf(
154                &mut self,
155                _udf: Arc<datafusion_expr::ScalarUDF>,
156            ) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
157                datafusion_common::internal_err!(
158                    "register_udf called in Placeholder Registry!"
159                )
160            }
161            fn register_udwf(
162                &mut self,
163                _udaf: Arc<WindowUDF>,
164            ) -> Result<Option<Arc<WindowUDF>>> {
165                datafusion_common::internal_err!(
166                    "register_udwf called in Placeholder Registry!"
167                )
168            }
169
170            fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
171                vec![]
172            }
173        }
174        Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
175
176        Ok(bytes)
177    }
178
179    fn from_bytes_with_registry(
180        bytes: &[u8],
181        registry: &dyn FunctionRegistry,
182    ) -> Result<Self> {
183        let protobuf = protobuf::LogicalExprNode::decode(bytes)
184            .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
185
186        let extension_codec = DefaultLogicalExtensionCodec {};
187        logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec)
188            .map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
189    }
190}
191
192/// Serialize a LogicalPlan as bytes
193pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
194    let extension_codec = DefaultLogicalExtensionCodec {};
195    logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
196}
197
198/// Serialize a LogicalPlan as JSON
199#[cfg(feature = "json")]
200pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
201    let extension_codec = DefaultLogicalExtensionCodec {};
202    logical_plan_to_json_with_extension_codec(plan, &extension_codec)
203}
204
205/// Serialize a LogicalPlan as bytes, using the provided extension codec
206pub fn logical_plan_to_bytes_with_extension_codec(
207    plan: &LogicalPlan,
208    extension_codec: &dyn LogicalExtensionCodec,
209) -> Result<Bytes> {
210    let protobuf =
211        protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
212    let mut buffer = BytesMut::new();
213    protobuf
214        .encode(&mut buffer)
215        .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
216    Ok(buffer.into())
217}
218
219/// Serialize a LogicalPlan as JSON using the provided extension codec
220#[cfg(feature = "json")]
221pub fn logical_plan_to_json_with_extension_codec(
222    plan: &LogicalPlan,
223    extension_codec: &dyn LogicalExtensionCodec,
224) -> Result<String> {
225    let protobuf =
226        protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
227            .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
228    serde_json::to_string(&protobuf)
229        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
230}
231
232/// Deserialize a LogicalPlan from JSON
233#[cfg(feature = "json")]
234pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
235    let extension_codec = DefaultLogicalExtensionCodec {};
236    logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
237}
238
239/// Deserialize a LogicalPlan from bytes
240pub fn logical_plan_from_bytes(
241    bytes: &[u8],
242    ctx: &SessionContext,
243) -> Result<LogicalPlan> {
244    let extension_codec = DefaultLogicalExtensionCodec {};
245    logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
246}
247
248/// Deserialize a LogicalPlan from bytes
249pub fn logical_plan_from_bytes_with_extension_codec(
250    bytes: &[u8],
251    ctx: &SessionContext,
252    extension_codec: &dyn LogicalExtensionCodec,
253) -> Result<LogicalPlan> {
254    let protobuf = protobuf::LogicalPlanNode::decode(bytes)
255        .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
256    protobuf.try_into_logical_plan(ctx, extension_codec)
257}
258
259/// Deserialize a LogicalPlan from JSON
260#[cfg(feature = "json")]
261pub fn logical_plan_from_json_with_extension_codec(
262    json: &str,
263    ctx: &SessionContext,
264    extension_codec: &dyn LogicalExtensionCodec,
265) -> Result<LogicalPlan> {
266    let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
267        .map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
268    back.try_into_logical_plan(ctx, extension_codec)
269}
270
271/// Serialize a PhysicalPlan as bytes
272pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
273    let extension_codec = DefaultPhysicalExtensionCodec {};
274    physical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
275}
276
277/// Serialize a PhysicalPlan as JSON
278#[cfg(feature = "json")]
279pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
280    let extension_codec = DefaultPhysicalExtensionCodec {};
281    let protobuf =
282        protobuf::PhysicalPlanNode::try_from_physical_plan(plan, &extension_codec)
283            .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
284    serde_json::to_string(&protobuf)
285        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
286}
287
288/// Serialize a PhysicalPlan as bytes, using the provided extension codec
289pub fn physical_plan_to_bytes_with_extension_codec(
290    plan: Arc<dyn ExecutionPlan>,
291    extension_codec: &dyn PhysicalExtensionCodec,
292) -> Result<Bytes> {
293    let protobuf =
294        protobuf::PhysicalPlanNode::try_from_physical_plan(plan, extension_codec)?;
295    let mut buffer = BytesMut::new();
296    protobuf
297        .encode(&mut buffer)
298        .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
299    Ok(buffer.into())
300}
301
302/// Deserialize a PhysicalPlan from JSON
303#[cfg(feature = "json")]
304pub fn physical_plan_from_json(
305    json: &str,
306    ctx: &SessionContext,
307) -> Result<Arc<dyn ExecutionPlan>> {
308    let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
309        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
310    let extension_codec = DefaultPhysicalExtensionCodec {};
311    back.try_into_physical_plan(ctx, &ctx.runtime_env(), &extension_codec)
312}
313
314/// Deserialize a PhysicalPlan from bytes
315pub fn physical_plan_from_bytes(
316    bytes: &[u8],
317    ctx: &SessionContext,
318) -> Result<Arc<dyn ExecutionPlan>> {
319    let extension_codec = DefaultPhysicalExtensionCodec {};
320    physical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
321}
322
323/// Deserialize a PhysicalPlan from bytes
324pub fn physical_plan_from_bytes_with_extension_codec(
325    bytes: &[u8],
326    ctx: &SessionContext,
327    extension_codec: &dyn PhysicalExtensionCodec,
328) -> Result<Arc<dyn ExecutionPlan>> {
329    let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
330        .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
331    protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec)
332}