use crate::logical_plan::to_proto::serialize_expr;
use crate::logical_plan::{
self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
};
use crate::physical_plan::{
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
};
use crate::protobuf;
use datafusion_common::{plan_datafusion_err, Result};
use datafusion_expr::{
create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility,
WindowUDF,
};
use prost::{
bytes::{Bytes, BytesMut},
Message,
};
use std::sync::Arc;
use datafusion::execution::registry::FunctionRegistry;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_expr::planner::ExprPlanner;
mod registry;
pub trait Serializeable: Sized {
fn to_bytes(&self) -> Result<Bytes>;
fn from_bytes(bytes: &[u8]) -> Result<Self> {
Self::from_bytes_with_registry(bytes, ®istry::NoRegistry {})
}
fn from_bytes_with_registry(
bytes: &[u8],
registry: &dyn FunctionRegistry,
) -> Result<Self>;
}
impl Serializeable for Expr {
fn to_bytes(&self) -> Result<Bytes> {
let mut buffer = BytesMut::new();
let extension_codec = DefaultLogicalExtensionCodec {};
let protobuf: protobuf::LogicalExprNode = serialize_expr(self, &extension_codec)
.map_err(|e| plan_datafusion_err!("Error encoding expr as protobuf: {e}"))?;
protobuf
.encode(&mut buffer)
.map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
let bytes: Bytes = buffer.into();
struct PlaceHolderRegistry;
impl FunctionRegistry for PlaceHolderRegistry {
fn udfs(&self) -> std::collections::HashSet<String> {
std::collections::HashSet::default()
}
fn udf(&self, name: &str) -> Result<Arc<datafusion_expr::ScalarUDF>> {
Ok(Arc::new(create_udf(
name,
vec![],
arrow::datatypes::DataType::Null,
Volatility::Immutable,
Arc::new(|_| unimplemented!()),
)))
}
fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
Ok(Arc::new(create_udaf(
name,
vec![arrow::datatypes::DataType::Null],
Arc::new(arrow::datatypes::DataType::Null),
Volatility::Immutable,
Arc::new(|_| unimplemented!()),
Arc::new(vec![]),
)))
}
fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
Ok(Arc::new(create_udwf(
name,
arrow::datatypes::DataType::Null,
Arc::new(arrow::datatypes::DataType::Null),
Volatility::Immutable,
Arc::new(|| unimplemented!()),
)))
}
fn register_udaf(
&mut self,
_udaf: Arc<AggregateUDF>,
) -> Result<Option<Arc<AggregateUDF>>> {
datafusion_common::internal_err!(
"register_udaf called in Placeholder Registry!"
)
}
fn register_udf(
&mut self,
_udf: Arc<datafusion_expr::ScalarUDF>,
) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
datafusion_common::internal_err!(
"register_udf called in Placeholder Registry!"
)
}
fn register_udwf(
&mut self,
_udaf: Arc<WindowUDF>,
) -> Result<Option<Arc<WindowUDF>>> {
datafusion_common::internal_err!(
"register_udwf called in Placeholder Registry!"
)
}
fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
vec![]
}
}
Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
Ok(bytes)
}
fn from_bytes_with_registry(
bytes: &[u8],
registry: &dyn FunctionRegistry,
) -> Result<Self> {
let protobuf = protobuf::LogicalExprNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec)
.map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
}
}
pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
}
#[cfg(feature = "json")]
pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan_to_json_with_extension_codec(plan, &extension_codec)
}
pub fn logical_plan_to_bytes_with_extension_codec(
plan: &LogicalPlan,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<Bytes> {
let protobuf =
protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)?;
let mut buffer = BytesMut::new();
protobuf
.encode(&mut buffer)
.map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
Ok(buffer.into())
}
#[cfg(feature = "json")]
pub fn logical_plan_to_json_with_extension_codec(
plan: &LogicalPlan,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<String> {
let protobuf =
protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
serde_json::to_string(&protobuf)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
}
#[cfg(feature = "json")]
pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
}
pub fn logical_plan_from_bytes(
bytes: &[u8],
ctx: &SessionContext,
) -> Result<LogicalPlan> {
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
}
pub fn logical_plan_from_bytes_with_extension_codec(
bytes: &[u8],
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let protobuf = protobuf::LogicalPlanNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
protobuf.try_into_logical_plan(ctx, extension_codec)
}
#[cfg(feature = "json")]
pub fn logical_plan_from_json_with_extension_codec(
json: &str,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
back.try_into_logical_plan(ctx, extension_codec)
}
pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
let extension_codec = DefaultPhysicalExtensionCodec {};
physical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
}
#[cfg(feature = "json")]
pub fn physical_plan_to_json(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
let extension_codec = DefaultPhysicalExtensionCodec {};
let protobuf =
protobuf::PhysicalPlanNode::try_from_physical_plan(plan, &extension_codec)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
serde_json::to_string(&protobuf)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
}
pub fn physical_plan_to_bytes_with_extension_codec(
plan: Arc<dyn ExecutionPlan>,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Bytes> {
let protobuf =
protobuf::PhysicalPlanNode::try_from_physical_plan(plan, extension_codec)?;
let mut buffer = BytesMut::new();
protobuf
.encode(&mut buffer)
.map_err(|e| plan_datafusion_err!("Error encoding protobuf as bytes: {e}"))?;
Ok(buffer.into())
}
#[cfg(feature = "json")]
pub fn physical_plan_from_json(
json: &str,
ctx: &SessionContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultPhysicalExtensionCodec {};
back.try_into_physical_plan(ctx, &ctx.runtime_env(), &extension_codec)
}
pub fn physical_plan_from_bytes(
bytes: &[u8],
ctx: &SessionContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let extension_codec = DefaultPhysicalExtensionCodec {};
physical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
}
pub fn physical_plan_from_bytes_with_extension_codec(
bytes: &[u8],
ctx: &SessionContext,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>> {
let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec)
}