Skip to main content

datafusion_distributed/execution_plans/
network_broadcast.rs

1use crate::common::require_one_child;
2use crate::distributed_planner::{NetworkBoundary, ProducerHead};
3use crate::stage::{LocalStage, Stage};
4use crate::worker::WorkerConnectionPool;
5use crate::{BroadcastExec, DistributedTaskContext};
6use datafusion::common::{Result, not_impl_err, plan_err};
7use datafusion::error::DataFusionError;
8use datafusion::execution::{SendableRecordBatchStream, TaskContext};
9use datafusion::physical_expr_common::metrics::MetricsSet;
10use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
11use datafusion::physical_plan::{
12    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
13};
14use std::fmt::Formatter;
15use std::sync::Arc;
16use uuid::Uuid;
17
18/// Network boundary for broadcasting data to all consumer tasks.
19///
20/// This operator works with [BroadcastExec] which scales up partitions so each
21/// consumer task fetches a unique set of partition numbers. Each partition request
22/// is sent to all stage tasks because each task's leaf node is specialized to serve
23/// a different slice of the data for the same logical partition number.
24///
25/// Here are some examples of how [NetworkBroadcastExec] distributes data:
26///
27/// # 1 to many
28///
29/// ```text
30/// ┌────────────────────────┐                        ┌────────────────────────┐           ■
31/// │  NetworkBroadcastExec  │                        │  NetworkBroadcastExec  │           │
32/// │        (task 1)        │           ...          │        (task M)        │           │
33/// │                        │                        │                        │        Stage N
34/// │    Populates Caches    │                        │    Populates Caches    │           │
35/// └────────┬─┬┬─┬┬─┬───────┘                        └────────┬─┬┬─┬┬─┬───────┘           │
36///          │0││1││2│                                         │0││1││2│                   │
37///          └▲┘└▲┘└▲┘                                         └▲┘└▲┘└▲┘                   ■
38///           │  │  │                                           │  │  │
39///           │  │  │                                           │  │  │
40///           │  │  │                                           │  │  │
41///           │  │  └─────────────┐          ┌──────────────────┘  │  │
42///           │  └─────────────┐  │          │     ┌───────────────┘  │
43///           └─────────────┐  │  │          │     │    ┌─────────────┘
44///                         │  │  │          │     │    │
45///                        ┌┴┐┌┴┐┌┴┐ ... ┌───┴┐┌───┴┐┌──┴─┐
46///                        │1││2││3│     │NM-3││NM-2││NM-1│                                ■
47///                       ┌┴─┴┴─┴┴─┴─────┴────┴┴────┴┴────┴─┐                              │
48///                       │          BroadcastExec          │                              │
49///                       │        ┌───────────────┐        │                          Stage N-1
50///                       │        │  Batch Cache  │        │                              │
51///                       │        │  ┌─┐ ┌─┐ ┌─┐  │        │                              │
52///                       │        │  │0│ │1│ │2│  │        │                              │
53///                       │        │  └─┘ └─┘ └─┘  │        │                              │
54///                       │        └───────────────┘        │                              │
55///                       └───────────┬─┬─┬─┬─┬─┬───────────┘                              │
56///                                   │0│ │1│ │2│                                          │
57///                                   └▲┘ └▲┘ └▲┘                                          ■
58///                                    │   │   │
59///                                    │   │   │
60///                                    │   │   │
61///                                   ┌┴┐ ┌┴┐ ┌┴┐                                          ■
62///                                   │0│ │1│ │2│                                          │
63///                            ┌──────┴─┴─┴─┴─┴─┴──────┐                               Stage N-2
64///                            │Arc<dyn ExecutionPlan> │                                   │
65///                            │       (task 1)        │                                   │
66///                            └───────────────────────┘                                   ■
67/// ```
68///
69/// # Many to many
70///
71/// ```text
72///    ┌────────────────────────┐                        ┌────────────────────────┐          ■
73///    │  NetworkBroadcastExec  │                        │  NetworkBroadcastExec  │          │
74///    │        (task 1)        │                        │        (task M)        │          │
75///    │                        │           ...          │                        │       Stage N
76///    │    Populates Caches    │                        │       Cache Hits       │          │
77///    └────────┬─┬┬─┬┬─┬───────┘                        └────────┬─┬┬─┬┬─┬───────┘          │
78///             │0││1││2│                                         │0││1││2│                  │
79///             └▲┘└▲┘└▲┘                                         └▲┘└▲┘└▲┘                  ■
80///              │  │  │                                           │  │  │
81///   ┌──────────┴──┼──┼────────────────────────────────┐          │  │  │
82///   │  ┌──────────┴──┼────────────────────────────────┼──┐       │  │  │
83///   │  │  ┌──────────┴────────────────────────────────┼──┼──┐    │  │  │
84///   │  │  │                                           │  │  │    │  │  │
85///   │  │  │         ┌─────────────────────────────────┼──┼──┼────┴──┼─┐│
86///   │  │  │         │     ┌───────────────────────────┼──┼──┼───────┴─┼┼─────┐
87///   │  │  │         │     │     ┌─────────────────────┼──┼──┼─────────┼┴─────┼────┐
88///   │  │  │         │     │     │                     │  │  │         │      │    │
89///  ┌┴┐┌┴┐┌┴┐ ... ┌──┴─┐┌──┴─┐┌──┴─┐                  ┌┴┐┌┴┐┌┴┐ ... ┌──┴─┐┌───┴┐┌──┴─┐      ■
90///  │0││1││2│     │3M-3││3M-2││3M-1│                  │0││1││2│     │3M-3││3M-2││3M-1│      │
91/// ┌┴─┴┴─┴┴─┴─────┴────┴┴────┴┴────┴┐                ┌┴─┴┴─┴┴─┴─────┴────┴┴────┴┴────┴┐     │
92/// │         BroadcastExec          │                │         BroadcastExec          │     │
93/// │        ┌───────────────┐       │                │        ┌───────────────┐       │     │
94/// │        │  Batch Cache  │       │                │        │  Batch Cache  │       │     │
95/// │        │  ┌─┐ ┌─┐ ┌─┐  │       │      ...       │        │  ┌─┐ ┌─┐ ┌─┐  │       │ Stage N-1
96/// │        │  │0│ │1│ │2│  │       │                │        │  │0│ │1│ │2│  │       │     │
97/// │        │  └─┘ └─┘ └─┘  │       │                │        │  └─┘ └─┘ └─┘  │       │     │
98/// │        └───────────────┘       │                │        └───────────────┘       │     │
99/// └───────────┬─┬─┬─┬─┬─┬──────────┘                └───────────┬─┬─┬─┬─┬─┬──────────┘     │
100///             │0│ │1│ │2│                                       │0│ │1│ │2│                │
101///             └▲┘ └▲┘ └▲┘                                       └▲┘ └▲┘ └▲┘                ■
102///              │   │   │                                         │   │   │
103///              │   │   │                                         │   │   │
104///              │   │   │                                         │   │   │
105///             ┌┴┐ ┌┴┐ ┌┴┐                                       ┌┴┐ ┌┴┐ ┌┴┐                ■
106///             │0│ │1│ │2│                                       │0│ │1│ │2│                │
107///      ┌──────┴─┴─┴─┴─┴─┴──────┐                         ┌──────┴─┴─┴─┴─┴─┴──────┐     Stage N-2
108///      │Arc<dyn ExecutionPlan> │          ...            │Arc<dyn ExecutionPlan> │         │
109///      │       (task 1)        │                         │       (task N)        │         │
110///      └───────────────────────┘                         └───────────────────────┘         ■
111/// ```
112///
113/// Notice in this diagram that each [NetworkBroadcastExec] sends a request to fetch data from each
114/// [BroadcastExec] in the stage below per partition. This is because each [BroadcastExec] has its
115/// own cache which contains partial results for the partition. It is the [NetworkBroadcastExec]'s
116/// job to merge these partial partitions to then broadcast complete data to the consumers.
117#[derive(Debug, Clone)]
118pub struct NetworkBroadcastExec {
119    pub(crate) properties: Arc<PlanProperties>,
120    pub(crate) input_stage: Stage,
121    pub(crate) worker_connections: WorkerConnectionPool,
122}
123
124impl NetworkBroadcastExec {
125    pub(crate) fn from_stage(input_stage: Stage, input_properties: Arc<PlanProperties>) -> Self {
126        let input_partition_count = input_properties.partitioning.partition_count();
127        let properties = Arc::new(
128            PlanProperties::clone(&input_properties)
129                .with_partitioning(Partitioning::UnknownPartitioning(input_partition_count)),
130        );
131
132        Self {
133            properties,
134            worker_connections: WorkerConnectionPool::new(input_stage.task_count()),
135            input_stage,
136        }
137    }
138
139    /// Creates a new [NetworkBroadcastExec] fed by the provided [BroadcastExec]. The input plan
140    /// will be executed in a remote worker in `producer_tasks` number of tasks.
141    pub fn try_new(input: Arc<dyn ExecutionPlan>, producer_tasks: usize) -> Result<Self> {
142        if !input.is::<BroadcastExec>() {
143            return plan_err!("The input of a NetworkBroadcastExec can only be a BroadcastExec");
144        }
145
146        let input_properties = Arc::clone(input.properties());
147        Ok(Self::from_stage(
148            Stage::Local(LocalStage {
149                // At this point, query_id and num are just placeholders that will be filled by
150                // prepare_network_boundaries.rs. Users are not expected to provide valid values for
151                // these two parameters.
152                query_id: Uuid::nil(),
153                num: 0,
154                plan: input,
155                tasks: producer_tasks,
156            }),
157            input_properties,
158        ))
159    }
160}
161
162impl NetworkBoundary for NetworkBroadcastExec {
163    fn with_input_stage(&self, input_stage: Stage) -> Result<Arc<dyn ExecutionPlan>> {
164        let mut self_clone = self.clone();
165        self_clone.worker_connections = WorkerConnectionPool::new(input_stage.task_count());
166        self_clone.input_stage = input_stage;
167        Ok(Arc::new(self_clone))
168    }
169
170    fn input_stage(&self) -> &Stage {
171        &self.input_stage
172    }
173
174    fn producer_head(&self, consumer_task_count: usize) -> ProducerHead {
175        let partition_count = self.properties.output_partitioning().partition_count();
176        ProducerHead::BroadcastExec {
177            output_partitions: partition_count * consumer_task_count,
178        }
179    }
180}
181
182impl DisplayAs for NetworkBroadcastExec {
183    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
184        let input_tasks = self.input_stage.task_count();
185        let stage = self.input_stage.num();
186        let consumer_partitions = self.properties.partitioning.partition_count();
187        let stage_partitions = self
188            .input_stage
189            .local_plan()
190            .as_ref()
191            .map(|p| p.properties().partitioning.partition_count())
192            .unwrap_or(0);
193        write!(
194            f,
195            "[Stage {stage}] => NetworkBroadcastExec: partitions_per_consumer={consumer_partitions}, stage_partitions={stage_partitions}, input_tasks={input_tasks}",
196        )
197    }
198}
199
200impl ExecutionPlan for NetworkBroadcastExec {
201    fn name(&self) -> &str {
202        "NetworkBroadcastExec"
203    }
204
205    fn properties(&self) -> &Arc<PlanProperties> {
206        &self.properties
207    }
208
209    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
210        match &self.input_stage.local_plan() {
211            Some(plan) => vec![plan],
212            None => vec![],
213        }
214    }
215
216    fn with_new_children(
217        self: Arc<Self>,
218        children: Vec<Arc<dyn ExecutionPlan>>,
219    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
220        let mut self_clone = self.as_ref().clone();
221        match &mut self_clone.input_stage {
222            Stage::Local(local) => {
223                local.plan = require_one_child(children)?;
224            }
225            Stage::Remote(_) => not_impl_err!("NetworkBoundary cannot accept children")?,
226        }
227        Ok(Arc::new(self_clone))
228    }
229
230    fn execute(
231        &self,
232        partition: usize,
233        context: Arc<TaskContext>,
234    ) -> Result<SendableRecordBatchStream, DataFusionError> {
235        let remote_stage = match &self.input_stage {
236            Stage::Local(local) => return local.execute(partition, context),
237            Stage::Remote(remote_stage) => remote_stage,
238        };
239
240        let task_context = DistributedTaskContext::from_ctx(&context);
241        let out_partitions = self.properties.partitioning.partition_count();
242        let off = out_partitions * task_context.task_index;
243        let mut streams = Vec::with_capacity(self.input_stage.task_count());
244
245        for input_task_index in 0..self.input_stage.task_count() {
246            let worker_connection = self.worker_connections.get_or_init_worker_connection(
247                remote_stage,
248                off..(off + self.properties.partitioning.partition_count()),
249                input_task_index,
250                self.producer_head(task_context.task_count),
251                &context,
252            )?;
253
254            let stream = worker_connection.execute(off + partition)?;
255            streams.push(stream);
256        }
257
258        Ok(Box::pin(RecordBatchStreamAdapter::new(
259            self.schema(),
260            futures::stream::select_all(streams),
261        )))
262    }
263
264    fn metrics(&self) -> Option<MetricsSet> {
265        Some(self.worker_connections.metrics.clone_inner())
266    }
267}