Skip to main content

datafusion_distributed/distributed_planner/
network_boundary.rs

1use crate::{NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, Stage};
2use datafusion::physical_plan::ExecutionPlan;
3use std::sync::Arc;
4
5/// This trait represents a node that introduces the necessity of a network boundary in the plan.
6/// The distributed planner, upon stepping into one of these, will break the plan and build a stage
7/// out of it.
8pub trait NetworkBoundary: ExecutionPlan {
9    /// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this
10    /// information to perform any internal transformations necessary for distributed execution.
11    ///
12    /// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready".
13    fn with_input_stage(
14        &self,
15        input_stage: Stage,
16    ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>>;
17
18    /// Returns the assigned input [Stage], if any.
19    fn input_stage(&self) -> &Stage;
20}
21
22/// Extension trait for downcasting dynamic types to [NetworkBoundary].
23pub trait NetworkBoundaryExt {
24    /// Downcasts self to a [NetworkBoundary] if possible.
25    fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary>;
26    /// Returns whether self is a [NetworkBoundary] or not.
27    fn is_network_boundary(&self) -> bool {
28        self.as_network_boundary().is_some()
29    }
30}
31
32impl NetworkBoundaryExt for dyn ExecutionPlan {
33    fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary> {
34        if let Some(node) = self.as_any().downcast_ref::<NetworkShuffleExec>() {
35            Some(node)
36        } else if let Some(node) = self.as_any().downcast_ref::<NetworkCoalesceExec>() {
37            Some(node)
38        } else if let Some(node) = self.as_any().downcast_ref::<NetworkBroadcastExec>() {
39            Some(node)
40        } else {
41            None
42        }
43    }
44}