datafusion_proto/bytes/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serialization / Deserialization to Bytes
19use crate::logical_plan::to_proto::serialize_expr;
20use crate::logical_plan::{
21    self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
22};
23use crate::physical_plan::{
24    AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
25};
26use crate::protobuf;
27use datafusion_common::{plan_datafusion_err, Result};
28use datafusion_execution::TaskContext;
29use datafusion_expr::{
30    create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility,
31    WindowUDF,
32};
33use prost::{
34    bytes::{Bytes, BytesMut},
35    Message,
36};
37use std::sync::Arc;
38
39// Reexport Bytes which appears in the API
40use datafusion_execution::registry::FunctionRegistry;
41use datafusion_expr::planner::ExprPlanner;
42use datafusion_physical_plan::ExecutionPlan;
43
44mod registry;
45
46/// Encodes something (such as [`Expr`]) to/from a stream of
47/// bytes.
48///
49/// ```
50/// use datafusion_expr::{col, lit, Expr};
51/// use datafusion_proto::bytes::Serializeable;
52///
53/// // Create a new `Expr` a < 32
54/// let expr = col("a").lt(lit(5i32));
55///
56/// // Convert it to an opaque form
57/// let bytes = expr.to_bytes().unwrap();
58///
59/// // Decode bytes from somewhere (over network, etc.)
60/// let decoded_expr = Expr::from_bytes(&bytes).unwrap();
61/// assert_eq!(expr, decoded_expr);
62/// ```
63pub trait Serializeable: Sized {
64    /// Convert `self` to an opaque byte stream
65    fn to_bytes(&self) -> Result<Bytes>;
66
67    /// Convert `bytes` (the output of [`to_bytes`]) back into an
68    /// object. This will error if the serialized bytes contain any
69    /// user defined functions, in which case use
70    /// [`from_bytes_with_registry`]
71    ///
72    /// [`to_bytes`]: Self::to_bytes
73    /// [`from_bytes_with_registry`]: Self::from_bytes_with_registry
74    fn from_bytes(bytes: &[u8]) -> Result<Self> {
75        Self::from_bytes_with_registry(bytes, &registry::NoRegistry {})
76    }
77
78    /// Convert `bytes` (the output of [`to_bytes`]) back into an
79    /// object resolving user defined functions with the specified
80    /// `registry`
81    ///
82    /// [`to_bytes`]: Self::to_bytes
83    fn from_bytes_with_registry(
84        bytes: &[u8],
85        registry: &dyn FunctionRegistry,
86    ) -> Result<Self>;
87}
88
89impl Serializeable for Expr {
90    fn to_bytes(&self) -> Result<Bytes> {
91        let mut buffer = BytesMut::new();
92        let extension_codec = DefaultLogicalExtensionCodec {};
93        let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
94            .map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
95
96        protobuf
97            .encode(&mut buffer)
98            .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
99
100        let bytes: Bytes = buffer.into();
101
102        // the produced byte stream may lead to "recursion limit" errors, see
103        // https://github.com/apache/datafusion/issues/3968
104        // Until the underlying prost issue ( https://github.com/tokio-rs/prost/issues/736 ) is fixed, we try to
105        // deserialize the data here and check for errors.
106        //
107        // Need to provide some placeholder registry because the stream may contain UDFs
108        struct PlaceHolderRegistry;
109
110        impl FunctionRegistry for PlaceHolderRegistry {
111            fn udfs(&self) -> std::collections::HashSet<String> {
112                std::collections::HashSet::default()
113            }
114
115            fn udf(&self, name: &str) -> Result<Arc<datafusion_expr::ScalarUDF>> {
116                Ok(Arc::new(create_udf(
117                    name,
118                    vec![],
119                    arrow::datatypes::DataType::Null,
120                    Volatility::Immutable,
121                    Arc::new(|_| unimplemented!()),
122                )))
123            }
124
125            fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
126                Ok(Arc::new(create_udaf(
127                    name,
128                    vec![arrow::datatypes::DataType::Null],
129                    Arc::new(arrow::datatypes::DataType::Null),
130                    Volatility::Immutable,
131                    Arc::new(|_| unimplemented!()),
132                    Arc::new(vec![]),
133                )))
134            }
135
136            fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
137                Ok(Arc::new(create_udwf(
138                    name,
139                    arrow::datatypes::DataType::Null,
140                    Arc::new(arrow::datatypes::DataType::Null),
141                    Volatility::Immutable,
142                    Arc::new(|| unimplemented!()),
143                )))
144            }
145            fn register_udaf(
146                &mut self,
147                _udaf: Arc<AggregateUDF>,
148            ) -> Result<Option<Arc<AggregateUDF>>> {
149                datafusion_common::internal_err!(
150                    "register_udaf called in Placeholder Registry!"
151                )
152            }
153            fn register_udf(
154                &mut self,
155                _udf: Arc<datafusion_expr::ScalarUDF>,
156            ) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
157                datafusion_common::internal_err!(
158                    "register_udf called in Placeholder Registry!"
159                )
160            }
161            fn register_udwf(
162                &mut self,
163                _udaf: Arc<WindowUDF>,
164            ) -> Result<Option<Arc<WindowUDF>>> {
165                datafusion_common::internal_err!(
166                    "register_udwf called in Placeholder Registry!"
167                )
168            }
169
170            fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
171                vec![]
172            }
173
174            fn udafs(&self) -> std::collections::HashSet<String> {
175                std::collections::HashSet::default()
176            }
177
178            fn udwfs(&self) -> std::collections::HashSet<String> {
179                std::collections::HashSet::default()
180            }
181        }
182        Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
183
184        Ok(bytes)
185    }
186
187    fn from_bytes_with_registry(
188        bytes: &[u8],
189        registry: &dyn FunctionRegistry,
190    ) -> Result<Self> {
191        let protobuf = protobuf::LogicalExprNode::decode(bytes)
192            .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
193
194        let extension_codec = DefaultLogicalExtensionCodec {};
195        logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec)
196            .map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
197    }
198}
199
200/// Serialize a LogicalPlan as bytes
201pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
202    let extension_codec = DefaultLogicalExtensionCodec {};
203    logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
204}
205
206/// Serialize a LogicalPlan as JSON
207#[cfg(feature = "json")]
208pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
209    let extension_codec = DefaultLogicalExtensionCodec {};
210    logical_plan_to_json_with_extension_codec(plan, &extension_codec)
211}
212
213/// Serialize a LogicalPlan as bytes, using the provided extension codec
214pub fn logical_plan_to_bytes_with_extension_codec(
215    plan: &LogicalPlan,
216    extension_codec: &dyn LogicalExtensionCodec,
217) -> Result<Bytes> {
218    let protobuf =
219        protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
220    let mut buffer = BytesMut::new();
221    protobuf
222        .encode(&mut buffer)
223        .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
224    Ok(buffer.into())
225}
226
227/// Serialize a LogicalPlan as JSON using the provided extension codec
228#[cfg(feature = "json")]
229pub fn logical_plan_to_json_with_extension_codec(
230    plan: &LogicalPlan,
231    extension_codec: &dyn LogicalExtensionCodec,
232) -> Result<String> {
233    let protobuf =
234        protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
235            .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
236    serde_json::to_string(&protobuf)
237        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
238}
239
240/// Deserialize a LogicalPlan from JSON
241#[cfg(feature = "json")]
242pub fn logical_plan_from_json(json: &str, ctx: &TaskContext) -> Result<LogicalPlan> {
243    let extension_codec = DefaultLogicalExtensionCodec {};
244    logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
245}
246
247/// Deserialize a LogicalPlan from bytes
248pub fn logical_plan_from_bytes(bytes: &[u8], ctx: &TaskContext) -> Result<LogicalPlan> {
249    let extension_codec = DefaultLogicalExtensionCodec {};
250    logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
251}
252
253/// Deserialize a LogicalPlan from bytes
254pub fn logical_plan_from_bytes_with_extension_codec(
255    bytes: &[u8],
256    ctx: &TaskContext,
257    extension_codec: &dyn LogicalExtensionCodec,
258) -> Result<LogicalPlan> {
259    let protobuf = protobuf::LogicalPlanNode::decode(bytes)
260        .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
261    protobuf.try_into_logical_plan(ctx, extension_codec)
262}
263
264/// Deserialize a LogicalPlan from JSON
265#[cfg(feature = "json")]
266pub fn logical_plan_from_json_with_extension_codec(
267    json: &str,
268    ctx: &TaskContext,
269    extension_codec: &dyn LogicalExtensionCodec,
270) -> Result<LogicalPlan> {
271    let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
272        .map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
273    back.try_into_logical_plan(ctx, extension_codec)
274}
275
276/// Serialize a PhysicalPlan as bytes
277pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
278    let extension_codec = DefaultPhysicalExtensionCodec {};
279    physical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
280}
281
282/// Serialize a PhysicalPlan as JSON
283#[cfg(feature = "json")]
284pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
285    let extension_codec = DefaultPhysicalExtensionCodec {};
286    let protobuf =
287        protobuf::PhysicalPlanNode::try_from_physical_plan(plan, &extension_codec)
288            .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
289    serde_json::to_string(&protobuf)
290        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
291}
292
293/// Serialize a PhysicalPlan as bytes, using the provided extension codec
294pub fn physical_plan_to_bytes_with_extension_codec(
295    plan: Arc<dyn ExecutionPlan>,
296    extension_codec: &dyn PhysicalExtensionCodec,
297) -> Result<Bytes> {
298    let protobuf =
299        protobuf::PhysicalPlanNode::try_from_physical_plan(plan, extension_codec)?;
300    let mut buffer = BytesMut::new();
301    protobuf
302        .encode(&mut buffer)
303        .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
304    Ok(buffer.into())
305}
306
307/// Deserialize a PhysicalPlan from JSON
308#[cfg(feature = "json")]
309pub fn physical_plan_from_json(
310    json: &str,
311    ctx: &TaskContext,
312) -> Result<Arc<dyn ExecutionPlan>> {
313    let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
314        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
315    let extension_codec = DefaultPhysicalExtensionCodec {};
316    back.try_into_physical_plan(ctx, &extension_codec)
317}
318
319/// Deserialize a PhysicalPlan from bytes
320pub fn physical_plan_from_bytes(
321    bytes: &[u8],
322    ctx: &TaskContext,
323) -> Result<Arc<dyn ExecutionPlan>> {
324    let extension_codec = DefaultPhysicalExtensionCodec {};
325    physical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
326}
327
328/// Deserialize a PhysicalPlan from bytes
329pub fn physical_plan_from_bytes_with_extension_codec(
330    bytes: &[u8],
331    ctx: &TaskContext,
332    extension_codec: &dyn PhysicalExtensionCodec,
333) -> Result<Arc<dyn ExecutionPlan>> {
334    let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
335        .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
336    protobuf.try_into_physical_plan(ctx, extension_codec)
337}