datafusion_distributed/execution_plans/network_broadcast.rs
1use crate::common::require_one_child;
2use crate::distributed_planner::{NetworkBoundary, ProducerHead};
3use crate::stage::{LocalStage, Stage};
4use crate::worker::WorkerConnectionPool;
5use crate::{BroadcastExec, DistributedTaskContext};
6use datafusion::common::{Result, not_impl_err, plan_err};
7use datafusion::error::DataFusionError;
8use datafusion::execution::{SendableRecordBatchStream, TaskContext};
9use datafusion::physical_expr_common::metrics::MetricsSet;
10use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
11use datafusion::physical_plan::{
12 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
13};
14use std::fmt::Formatter;
15use std::sync::Arc;
16use uuid::Uuid;
17
18/// Network boundary for broadcasting data to all consumer tasks.
19///
20/// This operator works with [BroadcastExec] which scales up partitions so each
21/// consumer task fetches a unique set of partition numbers. Each partition request
22/// is sent to all stage tasks because each task's leaf node is specialized to serve
23/// a different slice of the data for the same logical partition number.
24///
25/// Here are some examples of how [NetworkBroadcastExec] distributes data:
26///
27/// # 1 to many
28///
29/// ```text
30/// ┌────────────────────────┐ ┌────────────────────────┐ ■
31/// │ NetworkBroadcastExec │ │ NetworkBroadcastExec │ │
32/// │ (task 1) │ ... │ (task M) │ │
33/// │ │ │ │ Stage N
34/// │ Populates Caches │ │ Populates Caches │ │
35/// └────────┬─┬┬─┬┬─┬───────┘ └────────┬─┬┬─┬┬─┬───────┘ │
36/// │0││1││2│ │0││1││2│ │
37/// └▲┘└▲┘└▲┘ └▲┘└▲┘└▲┘ ■
38/// │ │ │ │ │ │
39/// │ │ │ │ │ │
40/// │ │ │ │ │ │
41/// │ │ └─────────────┐ ┌──────────────────┘ │ │
42/// │ └─────────────┐ │ │ ┌───────────────┘ │
43/// └─────────────┐ │ │ │ │ ┌─────────────┘
44/// │ │ │ │ │ │
45/// ┌┴┐┌┴┐┌┴┐ ... ┌───┴┐┌───┴┐┌──┴─┐
46/// │1││2││3│ │NM-3││NM-2││NM-1│ ■
47/// ┌┴─┴┴─┴┴─┴─────┴────┴┴────┴┴────┴─┐ │
48/// │ BroadcastExec │ │
49/// │ ┌───────────────┐ │ Stage N-1
50/// │ │ Batch Cache │ │ │
51/// │ │ ┌─┐ ┌─┐ ┌─┐ │ │ │
52/// │ │ │0│ │1│ │2│ │ │ │
53/// │ │ └─┘ └─┘ └─┘ │ │ │
54/// │ └───────────────┘ │ │
55/// └───────────┬─┬─┬─┬─┬─┬───────────┘ │
56/// │0│ │1│ │2│ │
57/// └▲┘ └▲┘ └▲┘ ■
58/// │ │ │
59/// │ │ │
60/// │ │ │
61/// ┌┴┐ ┌┴┐ ┌┴┐ ■
62/// │0│ │1│ │2│ │
63/// ┌──────┴─┴─┴─┴─┴─┴──────┐ Stage N-2
64/// │Arc<dyn ExecutionPlan> │ │
65/// │ (task 1) │ │
66/// └───────────────────────┘ ■
67/// ```
68///
69/// # Many to many
70///
71/// ```text
72/// ┌────────────────────────┐ ┌────────────────────────┐ ■
73/// │ NetworkBroadcastExec │ │ NetworkBroadcastExec │ │
74/// │ (task 1) │ │ (task M) │ │
75/// │ │ ... │ │ Stage N
76/// │ Populates Caches │ │ Cache Hits │ │
77/// └────────┬─┬┬─┬┬─┬───────┘ └────────┬─┬┬─┬┬─┬───────┘ │
78/// │0││1││2│ │0││1││2│ │
79/// └▲┘└▲┘└▲┘ └▲┘└▲┘└▲┘ ■
80/// │ │ │ │ │ │
81/// ┌──────────┴──┼──┼────────────────────────────────┐ │ │ │
82/// │ ┌──────────┴──┼────────────────────────────────┼──┐ │ │ │
83/// │ │ ┌──────────┴────────────────────────────────┼──┼──┐ │ │ │
84/// │ │ │ │ │ │ │ │ │
85/// │ │ │ ┌─────────────────────────────────┼──┼──┼────┴──┼─┐│
86/// │ │ │ │ ┌───────────────────────────┼──┼──┼───────┴─┼┼─────┐
87/// │ │ │ │ │ ┌─────────────────────┼──┼──┼─────────┼┴─────┼────┐
88/// │ │ │ │ │ │ │ │ │ │ │ │
89/// ┌┴┐┌┴┐┌┴┐ ... ┌──┴─┐┌──┴─┐┌──┴─┐ ┌┴┐┌┴┐┌┴┐ ... ┌──┴─┐┌───┴┐┌──┴─┐ ■
90/// │0││1││2│ │3M-3││3M-2││3M-1│ │0││1││2│ │3M-3││3M-2││3M-1│ │
91/// ┌┴─┴┴─┴┴─┴─────┴────┴┴────┴┴────┴┐ ┌┴─┴┴─┴┴─┴─────┴────┴┴────┴┴────┴┐ │
92/// │ BroadcastExec │ │ BroadcastExec │ │
93/// │ ┌───────────────┐ │ │ ┌───────────────┐ │ │
94/// │ │ Batch Cache │ │ │ │ Batch Cache │ │ │
95/// │ │ ┌─┐ ┌─┐ ┌─┐ │ │ ... │ │ ┌─┐ ┌─┐ ┌─┐ │ │ Stage N-1
96/// │ │ │0│ │1│ │2│ │ │ │ │ │0│ │1│ │2│ │ │ │
97/// │ │ └─┘ └─┘ └─┘ │ │ │ │ └─┘ └─┘ └─┘ │ │ │
98/// │ └───────────────┘ │ │ └───────────────┘ │ │
99/// └───────────┬─┬─┬─┬─┬─┬──────────┘ └───────────┬─┬─┬─┬─┬─┬──────────┘ │
100/// │0│ │1│ │2│ │0│ │1│ │2│ │
101/// └▲┘ └▲┘ └▲┘ └▲┘ └▲┘ └▲┘ ■
102/// │ │ │ │ │ │
103/// │ │ │ │ │ │
104/// │ │ │ │ │ │
105/// ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ┌┴┐ ■
106/// │0│ │1│ │2│ │0│ │1│ │2│ │
107/// ┌──────┴─┴─┴─┴─┴─┴──────┐ ┌──────┴─┴─┴─┴─┴─┴──────┐ Stage N-2
108/// │Arc<dyn ExecutionPlan> │ ... │Arc<dyn ExecutionPlan> │ │
109/// │ (task 1) │ │ (task N) │ │
110/// └───────────────────────┘ └───────────────────────┘ ■
111/// ```
112///
113/// Notice in this diagram that each [NetworkBroadcastExec] sends a request to fetch data from each
114/// [BroadcastExec] in the stage below per partition. This is because each [BroadcastExec] has its
115/// own cache which contains partial results for the partition. It is the [NetworkBroadcastExec]'s
116/// job to merge these partial partitions to then broadcast complete data to the consumers.
117#[derive(Debug, Clone)]
118pub struct NetworkBroadcastExec {
119 pub(crate) properties: Arc<PlanProperties>,
120 pub(crate) input_stage: Stage,
121 pub(crate) worker_connections: WorkerConnectionPool,
122}
123
124impl NetworkBroadcastExec {
125 pub(crate) fn from_stage(input_stage: Stage, input_properties: Arc<PlanProperties>) -> Self {
126 let input_partition_count = input_properties.partitioning.partition_count();
127 let properties = Arc::new(
128 PlanProperties::clone(&input_properties)
129 .with_partitioning(Partitioning::UnknownPartitioning(input_partition_count)),
130 );
131
132 Self {
133 properties,
134 worker_connections: WorkerConnectionPool::new(input_stage.task_count()),
135 input_stage,
136 }
137 }
138
139 /// Creates a new [NetworkBroadcastExec] fed by the provided [BroadcastExec]. The input plan
140 /// will be executed in a remote worker in `producer_tasks` number of tasks.
141 pub fn try_new(input: Arc<dyn ExecutionPlan>, producer_tasks: usize) -> Result<Self> {
142 if !input.is::<BroadcastExec>() {
143 return plan_err!("The input of a NetworkBroadcastExec can only be a BroadcastExec");
144 }
145
146 let input_properties = Arc::clone(input.properties());
147 Ok(Self::from_stage(
148 Stage::Local(LocalStage {
149 // At this point, query_id and num are just placeholders that will be filled by
150 // prepare_network_boundaries.rs. Users are not expected to provide valid values for
151 // these two parameters.
152 query_id: Uuid::nil(),
153 num: 0,
154 plan: input,
155 tasks: producer_tasks,
156 }),
157 input_properties,
158 ))
159 }
160}
161
162impl NetworkBoundary for NetworkBroadcastExec {
163 fn with_input_stage(&self, input_stage: Stage) -> Result<Arc<dyn ExecutionPlan>> {
164 let mut self_clone = self.clone();
165 self_clone.worker_connections = WorkerConnectionPool::new(input_stage.task_count());
166 self_clone.input_stage = input_stage;
167 Ok(Arc::new(self_clone))
168 }
169
170 fn input_stage(&self) -> &Stage {
171 &self.input_stage
172 }
173
174 fn producer_head(&self, consumer_task_count: usize) -> ProducerHead {
175 let partition_count = self.properties.output_partitioning().partition_count();
176 ProducerHead::BroadcastExec {
177 output_partitions: partition_count * consumer_task_count,
178 }
179 }
180}
181
182impl DisplayAs for NetworkBroadcastExec {
183 fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
184 let input_tasks = self.input_stage.task_count();
185 let stage = self.input_stage.num();
186 let consumer_partitions = self.properties.partitioning.partition_count();
187 let stage_partitions = self
188 .input_stage
189 .local_plan()
190 .as_ref()
191 .map(|p| p.properties().partitioning.partition_count())
192 .unwrap_or(0);
193 write!(
194 f,
195 "[Stage {stage}] => NetworkBroadcastExec: partitions_per_consumer={consumer_partitions}, stage_partitions={stage_partitions}, input_tasks={input_tasks}",
196 )
197 }
198}
199
200impl ExecutionPlan for NetworkBroadcastExec {
201 fn name(&self) -> &str {
202 "NetworkBroadcastExec"
203 }
204
205 fn properties(&self) -> &Arc<PlanProperties> {
206 &self.properties
207 }
208
209 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
210 match &self.input_stage.local_plan() {
211 Some(plan) => vec![plan],
212 None => vec![],
213 }
214 }
215
216 fn with_new_children(
217 self: Arc<Self>,
218 children: Vec<Arc<dyn ExecutionPlan>>,
219 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
220 let mut self_clone = self.as_ref().clone();
221 match &mut self_clone.input_stage {
222 Stage::Local(local) => {
223 local.plan = require_one_child(children)?;
224 }
225 Stage::Remote(_) => not_impl_err!("NetworkBoundary cannot accept children")?,
226 }
227 Ok(Arc::new(self_clone))
228 }
229
230 fn execute(
231 &self,
232 partition: usize,
233 context: Arc<TaskContext>,
234 ) -> Result<SendableRecordBatchStream, DataFusionError> {
235 let remote_stage = match &self.input_stage {
236 Stage::Local(local) => return local.execute(partition, context),
237 Stage::Remote(remote_stage) => remote_stage,
238 };
239
240 let task_context = DistributedTaskContext::from_ctx(&context);
241 let out_partitions = self.properties.partitioning.partition_count();
242 let off = out_partitions * task_context.task_index;
243 let mut streams = Vec::with_capacity(self.input_stage.task_count());
244
245 for input_task_index in 0..self.input_stage.task_count() {
246 let worker_connection = self.worker_connections.get_or_init_worker_connection(
247 remote_stage,
248 off..(off + self.properties.partitioning.partition_count()),
249 input_task_index,
250 self.producer_head(task_context.task_count),
251 &context,
252 )?;
253
254 let stream = worker_connection.execute(off + partition)?;
255 streams.push(stream);
256 }
257
258 Ok(Box::pin(RecordBatchStreamAdapter::new(
259 self.schema(),
260 futures::stream::select_all(streams),
261 )))
262 }
263
264 fn metrics(&self) -> Option<MetricsSet> {
265 Some(self.worker_connections.metrics.clone_inner())
266 }
267}