datafusion_proto/bytes/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serialization / Deserialization to Bytes
19use crate::logical_plan::to_proto::serialize_expr;
20use crate::logical_plan::{
21    self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
22};
23use crate::physical_plan::{
24    AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
25};
26use crate::protobuf;
27use datafusion_common::{plan_datafusion_err, Result};
28use datafusion_expr::{
29    create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility,
30    WindowUDF,
31};
32use prost::{
33    bytes::{Bytes, BytesMut},
34    Message,
35};
36use std::sync::Arc;
37
38// Reexport Bytes which appears in the API
39use datafusion::execution::registry::FunctionRegistry;
40use datafusion::physical_plan::ExecutionPlan;
41use datafusion::prelude::SessionContext;
42use datafusion_expr::planner::ExprPlanner;
43
44mod registry;
45
46/// Encodes something (such as [`Expr`]) to/from a stream of
47/// bytes.
48///
49/// ```
50/// use datafusion_expr::{col, lit, Expr};
51/// use datafusion_proto::bytes::Serializeable;
52///
53/// // Create a new `Expr` a < 32
54/// let expr = col("a").lt(lit(5i32));
55///
56/// // Convert it to an opaque form
57/// let bytes = expr.to_bytes().unwrap();
58///
59/// // Decode bytes from somewhere (over network, etc.)
60/// let decoded_expr = Expr::from_bytes(&bytes).unwrap();
61/// assert_eq!(expr, decoded_expr);
62/// ```
63pub trait Serializeable: Sized {
64    /// Convert `self` to an opaque byte stream
65    fn to_bytes(&self) -> Result<Bytes>;
66
67    /// Convert `bytes` (the output of [`to_bytes`]) back into an
68    /// object. This will error if the serialized bytes contain any
69    /// user defined functions, in which case use
70    /// [`from_bytes_with_registry`]
71    ///
72    /// [`to_bytes`]: Self::to_bytes
73    /// [`from_bytes_with_registry`]: Self::from_bytes_with_registry
74    fn from_bytes(bytes: &[u8]) -> Result<Self> {
75        Self::from_bytes_with_registry(bytes, &registry::NoRegistry {})
76    }
77
78    /// Convert `bytes` (the output of [`to_bytes`]) back into an
79    /// object resolving user defined functions with the specified
80    /// `registry`
81    ///
82    /// [`to_bytes`]: Self::to_bytes
83    fn from_bytes_with_registry(
84        bytes: &[u8],
85        registry: &dyn FunctionRegistry,
86    ) -> Result<Self>;
87}
88
89impl Serializeable for Expr {
90    fn to_bytes(&self) -> Result<Bytes> {
91        let mut buffer = BytesMut::new();
92        let extension_codec = DefaultLogicalExtensionCodec {};
93        let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
94            .map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
95
96        protobuf
97            .encode(&mut buffer)
98            .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
99
100        let bytes: Bytes = buffer.into();
101
102        // the produced byte stream may lead to "recursion limit" errors, see
103        // https://github.com/apache/datafusion/issues/3968
104        // Until the underlying prost issue ( https://github.com/tokio-rs/prost/issues/736 ) is fixed, we try to
105        // deserialize the data here and check for errors.
106        //
107        // Need to provide some placeholder registry because the stream may contain UDFs
108        struct PlaceHolderRegistry;
109
110        impl FunctionRegistry for PlaceHolderRegistry {
111            fn udfs(&self) -> std::collections::HashSet<String> {
112                std::collections::HashSet::default()
113            }
114
115            fn udf(&self, name: &str) -> Result<Arc<datafusion_expr::ScalarUDF>> {
116                Ok(Arc::new(create_udf(
117                    name,
118                    vec![],
119                    arrow::datatypes::DataType::Null,
120                    Volatility::Immutable,
121                    Arc::new(|_| unimplemented!()),
122                )))
123            }
124
125            fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
126                Ok(Arc::new(create_udaf(
127                    name,
128                    vec![arrow::datatypes::DataType::Null],
129                    Arc::new(arrow::datatypes::DataType::Null),
130                    Volatility::Immutable,
131                    Arc::new(|_| unimplemented!()),
132                    Arc::new(vec![]),
133                )))
134            }
135
136            fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
137                Ok(Arc::new(create_udwf(
138                    name,
139                    arrow::datatypes::DataType::Null,
140                    Arc::new(arrow::datatypes::DataType::Null),
141                    Volatility::Immutable,
142                    Arc::new(|| unimplemented!()),
143                )))
144            }
145            fn register_udaf(
146                &mut self,
147                _udaf: Arc<AggregateUDF>,
148            ) -> Result<Option<Arc<AggregateUDF>>> {
149                datafusion_common::internal_err!(
150                    "register_udaf called in Placeholder Registry!"
151                )
152            }
153            fn register_udf(
154                &mut self,
155                _udf: Arc<datafusion_expr::ScalarUDF>,
156            ) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
157                datafusion_common::internal_err!(
158                    "register_udf called in Placeholder Registry!"
159                )
160            }
161            fn register_udwf(
162                &mut self,
163                _udaf: Arc<WindowUDF>,
164            ) -> Result<Option<Arc<WindowUDF>>> {
165                datafusion_common::internal_err!(
166                    "register_udwf called in Placeholder Registry!"
167                )
168            }
169
170            fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
171                vec![]
172            }
173
174            fn udafs(&self) -> std::collections::HashSet<String> {
175                std::collections::HashSet::default()
176            }
177
178            fn udwfs(&self) -> std::collections::HashSet<String> {
179                std::collections::HashSet::default()
180            }
181        }
182        Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
183
184        Ok(bytes)
185    }
186
187    fn from_bytes_with_registry(
188        bytes: &[u8],
189        registry: &dyn FunctionRegistry,
190    ) -> Result<Self> {
191        let protobuf = protobuf::LogicalExprNode::decode(bytes)
192            .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
193
194        let extension_codec = DefaultLogicalExtensionCodec {};
195        logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec)
196            .map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
197    }
198}
199
200/// Serialize a LogicalPlan as bytes
201pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
202    let extension_codec = DefaultLogicalExtensionCodec {};
203    logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
204}
205
206/// Serialize a LogicalPlan as JSON
207#[cfg(feature = "json")]
208pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
209    let extension_codec = DefaultLogicalExtensionCodec {};
210    logical_plan_to_json_with_extension_codec(plan, &extension_codec)
211}
212
213/// Serialize a LogicalPlan as bytes, using the provided extension codec
214pub fn logical_plan_to_bytes_with_extension_codec(
215    plan: &LogicalPlan,
216    extension_codec: &dyn LogicalExtensionCodec,
217) -> Result<Bytes> {
218    let protobuf =
219        protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
220    let mut buffer = BytesMut::new();
221    protobuf
222        .encode(&mut buffer)
223        .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
224    Ok(buffer.into())
225}
226
227/// Serialize a LogicalPlan as JSON using the provided extension codec
228#[cfg(feature = "json")]
229pub fn logical_plan_to_json_with_extension_codec(
230    plan: &LogicalPlan,
231    extension_codec: &dyn LogicalExtensionCodec,
232) -> Result<String> {
233    let protobuf =
234        protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
235            .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
236    serde_json::to_string(&protobuf)
237        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
238}
239
240/// Deserialize a LogicalPlan from JSON
241#[cfg(feature = "json")]
242pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
243    let extension_codec = DefaultLogicalExtensionCodec {};
244    logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
245}
246
247/// Deserialize a LogicalPlan from bytes
248pub fn logical_plan_from_bytes(
249    bytes: &[u8],
250    ctx: &SessionContext,
251) -> Result<LogicalPlan> {
252    let extension_codec = DefaultLogicalExtensionCodec {};
253    logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
254}
255
256/// Deserialize a LogicalPlan from bytes
257pub fn logical_plan_from_bytes_with_extension_codec(
258    bytes: &[u8],
259    ctx: &SessionContext,
260    extension_codec: &dyn LogicalExtensionCodec,
261) -> Result<LogicalPlan> {
262    let protobuf = protobuf::LogicalPlanNode::decode(bytes)
263        .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
264    protobuf.try_into_logical_plan(ctx, extension_codec)
265}
266
267/// Deserialize a LogicalPlan from JSON
268#[cfg(feature = "json")]
269pub fn logical_plan_from_json_with_extension_codec(
270    json: &str,
271    ctx: &SessionContext,
272    extension_codec: &dyn LogicalExtensionCodec,
273) -> Result<LogicalPlan> {
274    let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
275        .map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
276    back.try_into_logical_plan(ctx, extension_codec)
277}
278
279/// Serialize a PhysicalPlan as bytes
280pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
281    let extension_codec = DefaultPhysicalExtensionCodec {};
282    physical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
283}
284
285/// Serialize a PhysicalPlan as JSON
286#[cfg(feature = "json")]
287pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
288    let extension_codec = DefaultPhysicalExtensionCodec {};
289    let protobuf =
290        protobuf::PhysicalPlanNode::try_from_physical_plan(plan, &extension_codec)
291            .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
292    serde_json::to_string(&protobuf)
293        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
294}
295
296/// Serialize a PhysicalPlan as bytes, using the provided extension codec
297pub fn physical_plan_to_bytes_with_extension_codec(
298    plan: Arc<dyn ExecutionPlan>,
299    extension_codec: &dyn PhysicalExtensionCodec,
300) -> Result<Bytes> {
301    let protobuf =
302        protobuf::PhysicalPlanNode::try_from_physical_plan(plan, extension_codec)?;
303    let mut buffer = BytesMut::new();
304    protobuf
305        .encode(&mut buffer)
306        .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
307    Ok(buffer.into())
308}
309
310/// Deserialize a PhysicalPlan from JSON
311#[cfg(feature = "json")]
312pub fn physical_plan_from_json(
313    json: &str,
314    ctx: &SessionContext,
315) -> Result<Arc<dyn ExecutionPlan>> {
316    let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
317        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
318    let extension_codec = DefaultPhysicalExtensionCodec {};
319    back.try_into_physical_plan(ctx, &ctx.runtime_env(), &extension_codec)
320}
321
322/// Deserialize a PhysicalPlan from bytes
323pub fn physical_plan_from_bytes(
324    bytes: &[u8],
325    ctx: &SessionContext,
326) -> Result<Arc<dyn ExecutionPlan>> {
327    let extension_codec = DefaultPhysicalExtensionCodec {};
328    physical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
329}
330
331/// Deserialize a PhysicalPlan from bytes
332pub fn physical_plan_from_bytes_with_extension_codec(
333    bytes: &[u8],
334    ctx: &SessionContext,
335    extension_codec: &dyn PhysicalExtensionCodec,
336) -> Result<Arc<dyn ExecutionPlan>> {
337    let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
338        .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
339    protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec)
340}