use crate::logical_plan::to_proto::serialize_expr;
use crate::logical_plan::{
self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
};
use crate::physical_plan::{
DefaultPhysicalExtensionCodec, DefaultPhysicalProtoConverter, PhysicalExtensionCodec,
PhysicalPlanDecodeContext, PhysicalProtoConverterExtension,
};
use crate::protobuf;
use datafusion_common::{Result, plan_datafusion_err};
use datafusion_execution::TaskContext;
use datafusion_expr::{Expr, LogicalPlan};
use prost::{
Message,
bytes::{Bytes, BytesMut},
};
use std::sync::Arc;
use datafusion_physical_plan::ExecutionPlan;
pub trait Serializeable: Sized {
fn to_bytes(&self) -> Result<Bytes>;
fn from_bytes(bytes: &[u8]) -> Result<Self> {
Self::from_bytes_with_ctx(bytes, &TaskContext::default())
}
fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self>;
}
impl Serializeable for Expr {
fn to_bytes(&self) -> Result<Bytes> {
let mut buffer = BytesMut::new();
let extension_codec = DefaultLogicalExtensionCodec {};
let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
.map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
protobuf
.encode(&mut buffer)
.map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
let bytes: Bytes = buffer.into();
protobuf::LogicalExprNode::decode(bytes.as_ref())
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
Ok(bytes)
}
fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self> {
let protobuf = protobuf::LogicalExprNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan::from_proto::parse_expr(&protobuf, ctx, &extension_codec)
.map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
}
}
pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
}
#[cfg(feature = "json")]
pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan_to_json_with_extension_codec(plan, &extension_codec)
}
pub fn logical_plan_to_bytes_with_extension_codec(
plan: &LogicalPlan,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<Bytes> {
let protobuf =
protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
let mut buffer = BytesMut::new();
protobuf
.encode(&mut buffer)
.map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
Ok(buffer.into())
}
#[cfg(feature = "json")]
pub fn logical_plan_to_json_with_extension_codec(
plan: &LogicalPlan,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<String> {
let protobuf =
protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
serde_json::to_string(&protobuf)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
}
#[cfg(feature = "json")]
pub fn logical_plan_from_json(json: &str, ctx: &TaskContext) -> Result<LogicalPlan> {
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
}
pub fn logical_plan_from_bytes(bytes: &[u8], ctx: &TaskContext) -> Result<LogicalPlan> {
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
}
pub fn logical_plan_from_bytes_with_extension_codec(
bytes: &[u8],
ctx: &TaskContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let protobuf = protobuf::LogicalPlanNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
protobuf.try_into_logical_plan(ctx, extension_codec)
}
#[cfg(feature = "json")]
pub fn logical_plan_from_json_with_extension_codec(
json: &str,
ctx: &TaskContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
back.try_into_logical_plan(ctx, extension_codec)
}
pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
let extension_codec = DefaultPhysicalExtensionCodec {};
let proto_converter = DefaultPhysicalProtoConverter {};
physical_plan_to_bytes_with_proto_converter(plan, &extension_codec, &proto_converter)
}
#[cfg(feature = "json")]
pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
let extension_codec = DefaultPhysicalExtensionCodec {};
let proto_converter = DefaultPhysicalProtoConverter {};
let protobuf = proto_converter
.execution_plan_to_proto(&plan, &extension_codec)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
serde_json::to_string(&protobuf)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
}
pub fn physical_plan_to_bytes_with_extension_codec(
plan: Arc<dyn ExecutionPlan>,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Bytes> {
let proto_converter = DefaultPhysicalProtoConverter {};
physical_plan_to_bytes_with_proto_converter(plan, extension_codec, &proto_converter)
}
pub fn physical_plan_to_bytes_with_proto_converter(
plan: Arc<dyn ExecutionPlan>,
extension_codec: &dyn PhysicalExtensionCodec,
proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Bytes> {
let protobuf = proto_converter.execution_plan_to_proto(&plan, extension_codec)?;
let mut buffer = BytesMut::new();
protobuf
.encode(&mut buffer)
.map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
Ok(buffer.into())
}
#[cfg(feature = "json")]
pub fn physical_plan_from_json(
json: &str,
ctx: &TaskContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultPhysicalExtensionCodec {};
let proto_converter = DefaultPhysicalProtoConverter {};
let decode_ctx = PhysicalPlanDecodeContext::new(ctx, &extension_codec);
proto_converter.proto_to_execution_plan(&back, &decode_ctx)
}
pub fn physical_plan_from_bytes(
bytes: &[u8],
ctx: &TaskContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let extension_codec = DefaultPhysicalExtensionCodec {};
let proto_converter = DefaultPhysicalProtoConverter {};
physical_plan_from_bytes_with_proto_converter(
bytes,
ctx,
&extension_codec,
&proto_converter,
)
}
pub fn physical_plan_from_bytes_with_extension_codec(
bytes: &[u8],
ctx: &TaskContext,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>> {
let proto_converter = DefaultPhysicalProtoConverter {};
physical_plan_from_bytes_with_proto_converter(
bytes,
ctx,
extension_codec,
&proto_converter,
)
}
pub fn physical_plan_from_bytes_with_proto_converter(
bytes: &[u8],
ctx: &TaskContext,
extension_codec: &dyn PhysicalExtensionCodec,
proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
let decode_ctx = PhysicalPlanDecodeContext::new(ctx, extension_codec);
proto_converter.proto_to_execution_plan(&protobuf, &decode_ctx)
}