datafusion-distributed 2.0.0

Framework for enhancing Apache DataFusion with distributed capabilities
Documentation
use crate::DistributedCodec;
use crate::distributed_planner::ProducerHead;
use crate::worker::generated::worker::{
    BroadcastExecHead, NoneHead, RepartitionExecHead, execute_task_request as pb,
};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::Result;
use datafusion::execution::TaskContext;
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
use datafusion_proto::physical_plan::to_proto::serialize_partitioning;
use datafusion_proto::physical_plan::{DefaultPhysicalProtoConverter, PhysicalPlanDecodeContext};
use datafusion_proto::protobuf;
use datafusion_proto::protobuf::proto_error;
use prost::Message;
use std::sync::Arc;

impl ProducerHead {
    pub(crate) fn from_proto(
        proto: pb::ProducerHead,
        input_schema: &SchemaRef,
        ctx: &Arc<TaskContext>,
    ) -> Result<Self> {
        Ok(match proto {
            pb::ProducerHead::None(_) => ProducerHead::None,
            pb::ProducerHead::Broadcast(v) => ProducerHead::BroadcastExec {
                output_partitions: v.output_partitions as usize,
            },
            pb::ProducerHead::Repartition(v) => {
                let proto_partitioning = protobuf::Partitioning::decode(v.partitioning.as_slice())
                    .map_err(|e| proto_error(e.to_string()))?;
                let codec = DistributedCodec::new_combined_with_user(ctx.session_config());
                let decode_ctx = PhysicalPlanDecodeContext::new(ctx, &codec);
                let partitioning = parse_protobuf_partitioning(
                    Some(&proto_partitioning),
                    &decode_ctx,
                    input_schema,
                    &DefaultPhysicalProtoConverter {},
                )?
                .ok_or_else(|| proto_error("Could not parse partitioning"))?;

                ProducerHead::RepartitionExec { partitioning }
            }
        })
    }

    pub(crate) fn to_proto(&self, ctx: &Arc<TaskContext>) -> Result<pb::ProducerHead> {
        Ok(match self {
            ProducerHead::None => pb::ProducerHead::None(NoneHead {}),
            ProducerHead::BroadcastExec { output_partitions } => {
                pb::ProducerHead::Broadcast(BroadcastExecHead {
                    output_partitions: *output_partitions as u64,
                })
            }
            ProducerHead::RepartitionExec { partitioning } => {
                let codec = DistributedCodec::new_combined_with_user(ctx.session_config());
                let partitioning =
                    serialize_partitioning(partitioning, &codec, &DefaultPhysicalProtoConverter {})
                        .map(|v| v.encode_to_vec())?;
                pb::ProducerHead::Repartition(RepartitionExecHead { partitioning })
            }
        })
    }
}