Skip to main content

datafusion_proto/bytes/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Serialization / Deserialization to Bytes
19use crate::logical_plan::to_proto::serialize_expr;
20use crate::logical_plan::{
21    self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
22};
23use crate::physical_plan::{
24    DefaultPhysicalExtensionCodec, DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
25    PhysicalProtoConverterExtension,
26};
27use crate::protobuf;
28use datafusion_common::{Result, plan_datafusion_err};
29use datafusion_execution::TaskContext;
30use datafusion_expr::{
31    AggregateUDF, Expr, LogicalPlan, Volatility, WindowUDF, create_udaf, create_udf,
32    create_udwf,
33};
34use prost::{
35    Message,
36    bytes::{Bytes, BytesMut},
37};
38use std::sync::Arc;
39
40// Reexport Bytes which appears in the API
41use datafusion_execution::registry::FunctionRegistry;
42use datafusion_expr::planner::ExprPlanner;
43use datafusion_physical_plan::ExecutionPlan;
44
45mod registry;
46
47/// Encodes something (such as [`Expr`]) to/from a stream of
48/// bytes.
49///
50/// ```
51/// use datafusion_expr::{col, lit, Expr};
52/// use datafusion_proto::bytes::Serializeable;
53///
54/// // Create a new `Expr` a < 32
55/// let expr = col("a").lt(lit(5i32));
56///
57/// // Convert it to an opaque form
58/// let bytes = expr.to_bytes().unwrap();
59///
60/// // Decode bytes from somewhere (over network, etc.)
61/// let decoded_expr = Expr::from_bytes(&bytes).unwrap();
62/// assert_eq!(expr, decoded_expr);
63/// ```
64pub trait Serializeable: Sized {
65    /// Convert `self` to an opaque byte stream
66    fn to_bytes(&self) -> Result<Bytes>;
67
68    /// Convert `bytes` (the output of [`to_bytes`]) back into an
69    /// object. This will error if the serialized bytes contain any
70    /// user defined functions, in which case use
71    /// [`from_bytes_with_registry`]
72    ///
73    /// [`to_bytes`]: Self::to_bytes
74    /// [`from_bytes_with_registry`]: Self::from_bytes_with_registry
75    fn from_bytes(bytes: &[u8]) -> Result<Self> {
76        Self::from_bytes_with_registry(bytes, &registry::NoRegistry {})
77    }
78
79    /// Convert `bytes` (the output of [`to_bytes`]) back into an
80    /// object resolving user defined functions with the specified
81    /// `registry`
82    ///
83    /// [`to_bytes`]: Self::to_bytes
84    fn from_bytes_with_registry(
85        bytes: &[u8],
86        registry: &dyn FunctionRegistry,
87    ) -> Result<Self>;
88}
89
90impl Serializeable for Expr {
91    fn to_bytes(&self) -> Result<Bytes> {
92        let mut buffer = BytesMut::new();
93        let extension_codec = DefaultLogicalExtensionCodec {};
94        let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
95            .map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
96
97        protobuf
98            .encode(&mut buffer)
99            .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
100
101        let bytes: Bytes = buffer.into();
102
103        // the produced byte stream may lead to "recursion limit" errors, see
104        // https://github.com/apache/datafusion/issues/3968
105        // Until the underlying prost issue ( https://github.com/tokio-rs/prost/issues/736 ) is fixed, we try to
106        // deserialize the data here and check for errors.
107        //
108        // Need to provide some placeholder registry because the stream may contain UDFs
109        struct PlaceHolderRegistry;
110
111        impl FunctionRegistry for PlaceHolderRegistry {
112            fn udfs(&self) -> std::collections::HashSet<String> {
113                std::collections::HashSet::default()
114            }
115
116            fn udf(&self, name: &str) -> Result<Arc<datafusion_expr::ScalarUDF>> {
117                Ok(Arc::new(create_udf(
118                    name,
119                    vec![],
120                    arrow::datatypes::DataType::Null,
121                    Volatility::Immutable,
122                    Arc::new(|_| unimplemented!()),
123                )))
124            }
125
126            fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
127                Ok(Arc::new(create_udaf(
128                    name,
129                    vec![arrow::datatypes::DataType::Null],
130                    Arc::new(arrow::datatypes::DataType::Null),
131                    Volatility::Immutable,
132                    Arc::new(|_| unimplemented!()),
133                    Arc::new(vec![]),
134                )))
135            }
136
137            fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
138                Ok(Arc::new(create_udwf(
139                    name,
140                    arrow::datatypes::DataType::Null,
141                    Arc::new(arrow::datatypes::DataType::Null),
142                    Volatility::Immutable,
143                    Arc::new(|| unimplemented!()),
144                )))
145            }
146            fn register_udaf(
147                &mut self,
148                _udaf: Arc<AggregateUDF>,
149            ) -> Result<Option<Arc<AggregateUDF>>> {
150                datafusion_common::internal_err!(
151                    "register_udaf called in Placeholder Registry!"
152                )
153            }
154            fn register_udf(
155                &mut self,
156                _udf: Arc<datafusion_expr::ScalarUDF>,
157            ) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
158                datafusion_common::internal_err!(
159                    "register_udf called in Placeholder Registry!"
160                )
161            }
162            fn register_udwf(
163                &mut self,
164                _udaf: Arc<WindowUDF>,
165            ) -> Result<Option<Arc<WindowUDF>>> {
166                datafusion_common::internal_err!(
167                    "register_udwf called in Placeholder Registry!"
168                )
169            }
170
171            fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
172                vec![]
173            }
174
175            fn udafs(&self) -> std::collections::HashSet<String> {
176                std::collections::HashSet::default()
177            }
178
179            fn udwfs(&self) -> std::collections::HashSet<String> {
180                std::collections::HashSet::default()
181            }
182        }
183        Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
184
185        Ok(bytes)
186    }
187
188    fn from_bytes_with_registry(
189        bytes: &[u8],
190        registry: &dyn FunctionRegistry,
191    ) -> Result<Self> {
192        let protobuf = protobuf::LogicalExprNode::decode(bytes)
193            .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
194
195        let extension_codec = DefaultLogicalExtensionCodec {};
196        logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec)
197            .map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
198    }
199}
200
201/// Serialize a LogicalPlan as bytes
202pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
203    let extension_codec = DefaultLogicalExtensionCodec {};
204    logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
205}
206
207/// Serialize a LogicalPlan as JSON
208#[cfg(feature = "json")]
209pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
210    let extension_codec = DefaultLogicalExtensionCodec {};
211    logical_plan_to_json_with_extension_codec(plan, &extension_codec)
212}
213
214/// Serialize a LogicalPlan as bytes, using the provided extension codec
215pub fn logical_plan_to_bytes_with_extension_codec(
216    plan: &LogicalPlan,
217    extension_codec: &dyn LogicalExtensionCodec,
218) -> Result<Bytes> {
219    let protobuf =
220        protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
221    let mut buffer = BytesMut::new();
222    protobuf
223        .encode(&mut buffer)
224        .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
225    Ok(buffer.into())
226}
227
228/// Serialize a LogicalPlan as JSON using the provided extension codec
229#[cfg(feature = "json")]
230pub fn logical_plan_to_json_with_extension_codec(
231    plan: &LogicalPlan,
232    extension_codec: &dyn LogicalExtensionCodec,
233) -> Result<String> {
234    let protobuf =
235        protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
236            .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
237    serde_json::to_string(&protobuf)
238        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
239}
240
241/// Deserialize a LogicalPlan from JSON
242#[cfg(feature = "json")]
243pub fn logical_plan_from_json(json: &str, ctx: &TaskContext) -> Result<LogicalPlan> {
244    let extension_codec = DefaultLogicalExtensionCodec {};
245    logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
246}
247
248/// Deserialize a LogicalPlan from bytes
249pub fn logical_plan_from_bytes(bytes: &[u8], ctx: &TaskContext) -> Result<LogicalPlan> {
250    let extension_codec = DefaultLogicalExtensionCodec {};
251    logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
252}
253
254/// Deserialize a LogicalPlan from bytes
255pub fn logical_plan_from_bytes_with_extension_codec(
256    bytes: &[u8],
257    ctx: &TaskContext,
258    extension_codec: &dyn LogicalExtensionCodec,
259) -> Result<LogicalPlan> {
260    let protobuf = protobuf::LogicalPlanNode::decode(bytes)
261        .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
262    protobuf.try_into_logical_plan(ctx, extension_codec)
263}
264
265/// Deserialize a LogicalPlan from JSON
266#[cfg(feature = "json")]
267pub fn logical_plan_from_json_with_extension_codec(
268    json: &str,
269    ctx: &TaskContext,
270    extension_codec: &dyn LogicalExtensionCodec,
271) -> Result<LogicalPlan> {
272    let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
273        .map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
274    back.try_into_logical_plan(ctx, extension_codec)
275}
276
277/// Serialize a PhysicalPlan as bytes
278pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
279    let extension_codec = DefaultPhysicalExtensionCodec {};
280    let proto_converter = DefaultPhysicalProtoConverter {};
281    physical_plan_to_bytes_with_proto_converter(plan, &extension_codec, &proto_converter)
282}
283
284/// Serialize a PhysicalPlan as JSON
285#[cfg(feature = "json")]
286pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
287    let extension_codec = DefaultPhysicalExtensionCodec {};
288    let proto_converter = DefaultPhysicalProtoConverter {};
289    let protobuf = proto_converter
290        .execution_plan_to_proto(&plan, &extension_codec)
291        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
292    serde_json::to_string(&protobuf)
293        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
294}
295
296/// Serialize a PhysicalPlan as bytes, using the provided extension codec
297pub fn physical_plan_to_bytes_with_extension_codec(
298    plan: Arc<dyn ExecutionPlan>,
299    extension_codec: &dyn PhysicalExtensionCodec,
300) -> Result<Bytes> {
301    let proto_converter = DefaultPhysicalProtoConverter {};
302    physical_plan_to_bytes_with_proto_converter(plan, extension_codec, &proto_converter)
303}
304
305/// Serialize a PhysicalPlan as bytes, using the provided extension codec
306/// and protobuf converter.
307pub fn physical_plan_to_bytes_with_proto_converter(
308    plan: Arc<dyn ExecutionPlan>,
309    extension_codec: &dyn PhysicalExtensionCodec,
310    proto_converter: &dyn PhysicalProtoConverterExtension,
311) -> Result<Bytes> {
312    let protobuf = proto_converter.execution_plan_to_proto(&plan, extension_codec)?;
313    let mut buffer = BytesMut::new();
314    protobuf
315        .encode(&mut buffer)
316        .map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
317    Ok(buffer.into())
318}
319
320/// Deserialize a PhysicalPlan from JSON
321#[cfg(feature = "json")]
322pub fn physical_plan_from_json(
323    json: &str,
324    ctx: &TaskContext,
325) -> Result<Arc<dyn ExecutionPlan>> {
326    let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
327        .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
328    let extension_codec = DefaultPhysicalExtensionCodec {};
329    let proto_converter = DefaultPhysicalProtoConverter {};
330    proto_converter.proto_to_execution_plan(ctx, &extension_codec, &back)
331}
332
333/// Deserialize a PhysicalPlan from bytes
334pub fn physical_plan_from_bytes(
335    bytes: &[u8],
336    ctx: &TaskContext,
337) -> Result<Arc<dyn ExecutionPlan>> {
338    let extension_codec = DefaultPhysicalExtensionCodec {};
339    let proto_converter = DefaultPhysicalProtoConverter {};
340    physical_plan_from_bytes_with_proto_converter(
341        bytes,
342        ctx,
343        &extension_codec,
344        &proto_converter,
345    )
346}
347
348/// Deserialize a PhysicalPlan from bytes
349pub fn physical_plan_from_bytes_with_extension_codec(
350    bytes: &[u8],
351    ctx: &TaskContext,
352    extension_codec: &dyn PhysicalExtensionCodec,
353) -> Result<Arc<dyn ExecutionPlan>> {
354    let proto_converter = DefaultPhysicalProtoConverter {};
355    physical_plan_from_bytes_with_proto_converter(
356        bytes,
357        ctx,
358        extension_codec,
359        &proto_converter,
360    )
361}
362
363/// Deserialize a PhysicalPlan from bytes
364pub fn physical_plan_from_bytes_with_proto_converter(
365    bytes: &[u8],
366    ctx: &TaskContext,
367    extension_codec: &dyn PhysicalExtensionCodec,
368    proto_converter: &dyn PhysicalProtoConverterExtension,
369) -> Result<Arc<dyn ExecutionPlan>> {
370    let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
371        .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
372    proto_converter.proto_to_execution_plan(ctx, extension_codec, &protobuf)
373}