Skip to main content

datafusion_distributed/distributed_planner/
network_boundary.rs

1use crate::{BroadcastExec, NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, Stage};
2use datafusion::common::Result;
3use datafusion::physical_expr::Partitioning;
4use datafusion::physical_plan::repartition::RepartitionExec;
5use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
6use std::sync::Arc;
7
8/// This trait represents a node that introduces the necessity of a network boundary in the plan.
9/// The distributed planner, upon stepping into one of these, will break the plan and build a stage
10/// out of it.
11pub trait NetworkBoundary: ExecutionPlan {
12    /// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this
13    /// information to perform any internal transformations necessary for distributed execution.
14    ///
15    /// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready".
16    fn with_input_stage(&self, input_stage: Stage) -> Result<Arc<dyn ExecutionPlan>>;
17
18    /// Returns the assigned input [Stage], if any.
19    fn input_stage(&self) -> &Stage;
20
21    /// Defines what head node should the producer stage feeding this [NetworkBoundary]
22    /// implementation have. This information is used during planning an executing for ensuring
23    /// the head of a stage has the appropriate shape for consumption.
24    fn producer_head(&self, consumer_tasks: usize) -> ProducerHead;
25}
26
27/// Defines what shape should the head node of a stage have upon getting executed. Depending
28/// on the [NetworkBoundary] implementation, the stage below should have different head nodes.
29pub enum ProducerHead {
30    /// No specific head node is necessary.
31    None,
32    /// The head node should be a [BroadcastExec].
33    BroadcastExec { output_partitions: usize },
34    /// The head node should be a [RepartitionExec].
35    RepartitionExec { partitioning: Partitioning },
36}
37
38/// Extension trait for downcasting dynamic types to [NetworkBoundary].
39pub trait NetworkBoundaryExt {
40    /// Downcasts self to a [NetworkBoundary] if possible.
41    fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary>;
42    /// Returns whether self is a [NetworkBoundary] or not.
43    fn is_network_boundary(&self) -> bool {
44        self.as_network_boundary().is_some()
45    }
46}
47
48impl NetworkBoundaryExt for dyn ExecutionPlan {
49    fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary> {
50        if let Some(node) = self.downcast_ref::<NetworkShuffleExec>() {
51            Some(node)
52        } else if let Some(node) = self.downcast_ref::<NetworkCoalesceExec>() {
53            Some(node)
54        } else if let Some(node) = self.downcast_ref::<NetworkBroadcastExec>() {
55            Some(node)
56        } else {
57            None
58        }
59    }
60}
61
62/// Ensures the head of the provided plan complies with the passed [ProducerHead] definition. This
63/// can be called both during planning and lazily at runtime.
64pub(crate) fn insert_producer_head(
65    input: Arc<dyn ExecutionPlan>,
66    head: ProducerHead,
67) -> Result<Arc<dyn ExecutionPlan>> {
68    let input = if let Some(r_exec) = input.downcast_ref::<RepartitionExec>() {
69        Arc::clone(r_exec.input())
70    } else if let Some(b_exec) = input.downcast_ref::<BroadcastExec>() {
71        Arc::clone(b_exec.input())
72    } else {
73        input
74    };
75    let plan = match head {
76        ProducerHead::None => input,
77        ProducerHead::BroadcastExec { output_partitions } => {
78            let partitions = input.output_partitioning().partition_count();
79            Arc::new(BroadcastExec::new(input, output_partitions / partitions))
80        }
81        ProducerHead::RepartitionExec { partitioning } => {
82            Arc::new(RepartitionExec::try_new(input, partitioning)?)
83        }
84    };
85    Ok(plan)
86}