datafusion_distributed/distributed_planner/
network_boundary.rs1use crate::{BroadcastExec, NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, Stage};
2use datafusion::common::Result;
3use datafusion::physical_expr::Partitioning;
4use datafusion::physical_plan::repartition::RepartitionExec;
5use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
6use std::sync::Arc;
7
8pub trait NetworkBoundary: ExecutionPlan {
12 fn with_input_stage(&self, input_stage: Stage) -> Result<Arc<dyn ExecutionPlan>>;
17
18 fn input_stage(&self) -> &Stage;
20
21 fn producer_head(&self, consumer_tasks: usize) -> ProducerHead;
25}
26
27pub enum ProducerHead {
30 None,
32 BroadcastExec { output_partitions: usize },
34 RepartitionExec { partitioning: Partitioning },
36}
37
38pub trait NetworkBoundaryExt {
40 fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary>;
42 fn is_network_boundary(&self) -> bool {
44 self.as_network_boundary().is_some()
45 }
46}
47
48impl NetworkBoundaryExt for dyn ExecutionPlan {
49 fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary> {
50 if let Some(node) = self.downcast_ref::<NetworkShuffleExec>() {
51 Some(node)
52 } else if let Some(node) = self.downcast_ref::<NetworkCoalesceExec>() {
53 Some(node)
54 } else if let Some(node) = self.downcast_ref::<NetworkBroadcastExec>() {
55 Some(node)
56 } else {
57 None
58 }
59 }
60}
61
62pub(crate) fn insert_producer_head(
65 input: Arc<dyn ExecutionPlan>,
66 head: ProducerHead,
67) -> Result<Arc<dyn ExecutionPlan>> {
68 let input = if let Some(r_exec) = input.downcast_ref::<RepartitionExec>() {
69 Arc::clone(r_exec.input())
70 } else if let Some(b_exec) = input.downcast_ref::<BroadcastExec>() {
71 Arc::clone(b_exec.input())
72 } else {
73 input
74 };
75 let plan = match head {
76 ProducerHead::None => input,
77 ProducerHead::BroadcastExec { output_partitions } => {
78 let partitions = input.output_partitioning().partition_count();
79 Arc::new(BroadcastExec::new(input, output_partitions / partitions))
80 }
81 ProducerHead::RepartitionExec { partitioning } => {
82 Arc::new(RepartitionExec::try_new(input, partitioning)?)
83 }
84 };
85 Ok(plan)
86}